001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import com.google.protobuf.Message; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.RawCellBuilder; 041import org.apache.hadoop.hbase.RawCellBuilderFactory; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.SharedConnection; 044import org.apache.hadoop.hbase.client.Append; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.Delete; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.Increment; 050import org.apache.hadoop.hbase.client.Mutation; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 057import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorService; 061import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 062import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 063import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 064import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 065import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 066import org.apache.hadoop.hbase.coprocessor.ObserverContext; 067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 069import org.apache.hadoop.hbase.coprocessor.RegionObserver; 070import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; 071import org.apache.hadoop.hbase.filter.ByteArrayComparable; 072import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 073import org.apache.hadoop.hbase.io.Reference; 074import org.apache.hadoop.hbase.io.hfile.CacheConfig; 075import org.apache.hadoop.hbase.metrics.MetricRegistry; 076import org.apache.hadoop.hbase.regionserver.Region.Operation; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 079import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 080import org.apache.hadoop.hbase.security.User; 081import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALKey; 085import org.apache.yetus.audience.InterfaceAudience; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 090import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 091 092/** 093 * Implements the coprocessor environment and runtime support for coprocessors 094 * loaded within a {@link Region}. 095 */ 096@InterfaceAudience.Private 097public class RegionCoprocessorHost 098 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 099 100 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 101 // The shared data map 102 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 103 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 104 AbstractReferenceMap.ReferenceStrength.WEAK); 105 106 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 107 private final boolean hasCustomPostScannerFilterRow; 108 109 /** 110 * 111 * Encapsulation of the environment of each coprocessor 112 */ 113 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 114 implements RegionCoprocessorEnvironment { 115 private Region region; 116 ConcurrentMap<String, Object> sharedData; 117 private final MetricRegistry metricRegistry; 118 private final RegionServerServices services; 119 120 /** 121 * Constructor 122 * @param impl the coprocessor instance 123 * @param priority chaining priority 124 */ 125 public RegionEnvironment(final RegionCoprocessor impl, final int priority, 126 final int seq, final Configuration conf, final Region region, 127 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 128 super(impl, priority, seq, conf); 129 this.region = region; 130 this.sharedData = sharedData; 131 this.services = services; 132 this.metricRegistry = 133 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 134 } 135 136 /** @return the region */ 137 @Override 138 public Region getRegion() { 139 return region; 140 } 141 142 @Override 143 public OnlineRegions getOnlineRegions() { 144 return this.services; 145 } 146 147 @Override 148 public Connection getConnection() { 149 // Mocks may have services as null at test time. 150 return services != null ? new SharedConnection(services.getConnection()) : null; 151 } 152 153 @Override 154 public Connection createConnection(Configuration conf) throws IOException { 155 return services != null ? this.services.createConnection(conf) : null; 156 } 157 158 @Override 159 public ServerName getServerName() { 160 return services != null? services.getServerName(): null; 161 } 162 163 @Override 164 public void shutdown() { 165 super.shutdown(); 166 MetricsCoprocessor.removeRegistry(this.metricRegistry); 167 } 168 169 @Override 170 public ConcurrentMap<String, Object> getSharedData() { 171 return sharedData; 172 } 173 174 @Override 175 public RegionInfo getRegionInfo() { 176 return region.getRegionInfo(); 177 } 178 179 @Override 180 public MetricRegistry getMetricRegistryForRegionServer() { 181 return metricRegistry; 182 } 183 184 @Override 185 public RawCellBuilder getCellBuilder() { 186 // We always do a DEEP_COPY only 187 return RawCellBuilderFactory.create(); 188 } 189 } 190 191 /** 192 * Special version of RegionEnvironment that exposes RegionServerServices for Core 193 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 194 */ 195 private static class RegionEnvironmentForCoreCoprocessors extends 196 RegionEnvironment implements HasRegionServerServices { 197 private final RegionServerServices rsServices; 198 199 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 200 final int seq, final Configuration conf, final Region region, 201 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 202 super(impl, priority, seq, conf, region, services, sharedData); 203 this.rsServices = services; 204 } 205 206 /** 207 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 208 * consumption. 209 */ 210 @Override 211 public RegionServerServices getRegionServerServices() { 212 return this.rsServices; 213 } 214 } 215 216 static class TableCoprocessorAttribute { 217 private Path path; 218 private String className; 219 private int priority; 220 private Configuration conf; 221 222 public TableCoprocessorAttribute(Path path, String className, int priority, 223 Configuration conf) { 224 this.path = path; 225 this.className = className; 226 this.priority = priority; 227 this.conf = conf; 228 } 229 230 public Path getPath() { 231 return path; 232 } 233 234 public String getClassName() { 235 return className; 236 } 237 238 public int getPriority() { 239 return priority; 240 } 241 242 public Configuration getConf() { 243 return conf; 244 } 245 } 246 247 /** The region server services */ 248 RegionServerServices rsServices; 249 /** The region */ 250 HRegion region; 251 252 /** 253 * Constructor 254 * @param region the region 255 * @param rsServices interface to available region server functionality 256 * @param conf the configuration 257 */ 258 public RegionCoprocessorHost(final HRegion region, 259 final RegionServerServices rsServices, final Configuration conf) { 260 super(rsServices); 261 this.conf = conf; 262 this.rsServices = rsServices; 263 this.region = region; 264 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 265 266 // load system default cp's from configuration. 267 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 268 269 // load system default cp's for user tables from configuration. 270 if (!region.getRegionInfo().getTable().isSystemTable()) { 271 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 272 } 273 274 // load Coprocessor From HDFS 275 loadTableCoprocessors(conf); 276 277 // now check whether any coprocessor implements postScannerFilterRow 278 boolean hasCustomPostScannerFilterRow = false; 279 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { 280 if (env.getInstance() instanceof RegionObserver) { 281 Class<?> clazz = env.getInstance().getClass(); 282 for(;;) { 283 if (clazz == null) { 284 // we must have directly implemented RegionObserver 285 hasCustomPostScannerFilterRow = true; 286 break out; 287 } 288 try { 289 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 290 InternalScanner.class, Cell.class, boolean.class); 291 // this coprocessor has a custom version of postScannerFilterRow 292 hasCustomPostScannerFilterRow = true; 293 break out; 294 } catch (NoSuchMethodException ignore) { 295 } 296 // the deprecated signature still exists 297 try { 298 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 299 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 300 // this coprocessor has a custom version of postScannerFilterRow 301 hasCustomPostScannerFilterRow = true; 302 break out; 303 } catch (NoSuchMethodException ignore) { 304 } 305 clazz = clazz.getSuperclass(); 306 } 307 } 308 } 309 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 310 } 311 312 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 313 TableDescriptor htd) { 314 return htd.getCoprocessorDescriptors().stream().map(cp -> { 315 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 316 Configuration ourConf; 317 if (!cp.getProperties().isEmpty()) { 318 // do an explicit deep copy of the passed configuration 319 ourConf = new Configuration(false); 320 HBaseConfiguration.merge(ourConf, conf); 321 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 322 } else { 323 ourConf = conf; 324 } 325 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 326 }).collect(Collectors.toList()); 327 } 328 329 /** 330 * Sanity check the table coprocessor attributes of the supplied schema. Will 331 * throw an exception if there is a problem. 332 * @param conf 333 * @param htd 334 * @throws IOException 335 */ 336 public static void testTableCoprocessorAttrs(final Configuration conf, 337 final TableDescriptor htd) throws IOException { 338 String pathPrefix = UUID.randomUUID().toString(); 339 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { 340 if (attr.getPriority() < 0) { 341 throw new IOException("Priority for coprocessor " + attr.getClassName() + 342 " cannot be less than 0"); 343 } 344 ClassLoader old = Thread.currentThread().getContextClassLoader(); 345 try { 346 ClassLoader cl; 347 if (attr.getPath() != null) { 348 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 349 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 350 } else { 351 cl = CoprocessorHost.class.getClassLoader(); 352 } 353 Thread.currentThread().setContextClassLoader(cl); 354 if (cl instanceof CoprocessorClassLoader) { 355 String[] includedClassPrefixes = null; 356 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 357 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 358 includedClassPrefixes = prefixes.split(";"); 359 } 360 ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes); 361 } else { 362 cl.loadClass(attr.getClassName()); 363 } 364 } catch (ClassNotFoundException e) { 365 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 366 } finally { 367 Thread.currentThread().setContextClassLoader(old); 368 } 369 } 370 } 371 372 void loadTableCoprocessors(final Configuration conf) { 373 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 374 DEFAULT_COPROCESSORS_ENABLED); 375 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 376 DEFAULT_USER_COPROCESSORS_ENABLED); 377 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 378 return; 379 } 380 381 // scan the table attributes for coprocessor load specifications 382 // initialize the coprocessors 383 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 384 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 385 region.getTableDescriptor())) { 386 // Load encompasses classloading and coprocessor initialization 387 try { 388 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), 389 attr.getPriority(), attr.getConf()); 390 if (env == null) { 391 continue; 392 } 393 configured.add(env); 394 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + 395 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 396 } catch (Throwable t) { 397 // Coprocessor failed to load, do we abort on error? 398 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 399 abortServer(attr.getClassName(), t); 400 } else { 401 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 402 } 403 } 404 } 405 // add together to coprocessor set for COW efficiency 406 coprocEnvironments.addAll(configured); 407 } 408 409 @Override 410 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 411 Configuration conf) { 412 // If coprocessor exposes any services, register them. 413 for (Service service : instance.getServices()) { 414 region.registerService(service); 415 } 416 ConcurrentMap<String, Object> classData; 417 // make sure only one thread can add maps 418 synchronized (SHARED_DATA_MAP) { 419 // as long as at least one RegionEnvironment holds on to its classData it will 420 // remain in this map 421 classData = 422 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 423 k -> new ConcurrentHashMap<>()); 424 } 425 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 426 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 427 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, 428 rsServices, classData): 429 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 430 } 431 432 @Override 433 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 434 throws InstantiationException, IllegalAccessException { 435 try { 436 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 437 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 438 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { 439 // For backward compatibility with old CoprocessorService impl which don't extend 440 // RegionCoprocessor. 441 CoprocessorService cs; 442 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); 443 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); 444 } else { 445 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 446 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 447 return null; 448 } 449 } catch (NoSuchMethodException | InvocationTargetException e) { 450 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 451 } 452 } 453 454 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 455 RegionCoprocessor::getRegionObserver; 456 457 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 458 RegionCoprocessor::getEndpointObserver; 459 460 abstract class RegionObserverOperationWithoutResult extends 461 ObserverOperationWithoutResult<RegionObserver> { 462 public RegionObserverOperationWithoutResult() { 463 super(regionObserverGetter); 464 } 465 466 public RegionObserverOperationWithoutResult(User user) { 467 super(regionObserverGetter, user); 468 } 469 470 public RegionObserverOperationWithoutResult(boolean bypassable) { 471 super(regionObserverGetter, null, bypassable); 472 } 473 474 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 475 super(regionObserverGetter, user, bypassable); 476 } 477 } 478 479 abstract class BulkLoadObserverOperation extends 480 ObserverOperationWithoutResult<BulkLoadObserver> { 481 public BulkLoadObserverOperation(User user) { 482 super(RegionCoprocessor::getBulkLoadObserver, user); 483 } 484 } 485 486 487 ////////////////////////////////////////////////////////////////////////////////////////////////// 488 // Observer operations 489 ////////////////////////////////////////////////////////////////////////////////////////////////// 490 491 ////////////////////////////////////////////////////////////////////////////////////////////////// 492 // Observer operations 493 ////////////////////////////////////////////////////////////////////////////////////////////////// 494 495 /** 496 * Invoked before a region open. 497 * 498 * @throws IOException Signals that an I/O exception has occurred. 499 */ 500 public void preOpen() throws IOException { 501 if (coprocEnvironments.isEmpty()) { 502 return; 503 } 504 execOperation(new RegionObserverOperationWithoutResult() { 505 @Override 506 public void call(RegionObserver observer) throws IOException { 507 observer.preOpen(this); 508 } 509 }); 510 } 511 512 513 /** 514 * Invoked after a region open 515 */ 516 public void postOpen() { 517 if (coprocEnvironments.isEmpty()) { 518 return; 519 } 520 try { 521 execOperation(new RegionObserverOperationWithoutResult() { 522 @Override 523 public void call(RegionObserver observer) throws IOException { 524 observer.postOpen(this); 525 } 526 }); 527 } catch (IOException e) { 528 LOG.warn(e.toString(), e); 529 } 530 } 531 532 /** 533 * Invoked before a region is closed 534 * @param abortRequested true if the server is aborting 535 */ 536 public void preClose(final boolean abortRequested) throws IOException { 537 execOperation(new RegionObserverOperationWithoutResult() { 538 @Override 539 public void call(RegionObserver observer) throws IOException { 540 observer.preClose(this, abortRequested); 541 } 542 }); 543 } 544 545 /** 546 * Invoked after a region is closed 547 * @param abortRequested true if the server is aborting 548 */ 549 public void postClose(final boolean abortRequested) { 550 try { 551 execOperation(new RegionObserverOperationWithoutResult() { 552 @Override 553 public void call(RegionObserver observer) throws IOException { 554 observer.postClose(this, abortRequested); 555 } 556 557 @Override 558 public void postEnvCall() { 559 shutdown(this.getEnvironment()); 560 } 561 }); 562 } catch (IOException e) { 563 LOG.warn(e.toString(), e); 564 } 565 } 566 567 /** 568 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 569 * available candidates. 570 * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed 571 * the passed in <code>candidates</code>. 572 * @param store The store where compaction is being requested 573 * @param candidates The currently available store files 574 * @param tracker used to track the life cycle of a compaction 575 * @param user the user 576 * @throws IOException 577 */ 578 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 579 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 580 if (coprocEnvironments.isEmpty()) { 581 return false; 582 } 583 boolean bypassable = true; 584 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 585 @Override 586 public void call(RegionObserver observer) throws IOException { 587 observer.preCompactSelection(this, store, candidates, tracker); 588 } 589 }); 590 } 591 592 /** 593 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 594 * candidates. 595 * @param store The store where compaction is being requested 596 * @param selected The store files selected to compact 597 * @param tracker used to track the life cycle of a compaction 598 * @param request the compaction request 599 * @param user the user 600 */ 601 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 602 final CompactionLifeCycleTracker tracker, final CompactionRequest request, 603 final User user) throws IOException { 604 if (coprocEnvironments.isEmpty()) { 605 return; 606 } 607 execOperation(new RegionObserverOperationWithoutResult(user) { 608 @Override 609 public void call(RegionObserver observer) throws IOException { 610 observer.postCompactSelection(this, store, selected, tracker, request); 611 } 612 }); 613 } 614 615 /** 616 * Called prior to opening store scanner for compaction. 617 */ 618 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 619 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 620 if (coprocEnvironments.isEmpty()) { 621 return store.getScanInfo(); 622 } 623 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 624 execOperation(new RegionObserverOperationWithoutResult(user) { 625 @Override 626 public void call(RegionObserver observer) throws IOException { 627 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 628 } 629 }); 630 return builder.build(); 631 } 632 633 /** 634 * Called prior to rewriting the store files selected for compaction 635 * @param store the store being compacted 636 * @param scanner the scanner used to read store data during compaction 637 * @param scanType type of Scan 638 * @param tracker used to track the life cycle of a compaction 639 * @param request the compaction request 640 * @param user the user 641 * @return Scanner to use (cannot be null!) 642 * @throws IOException 643 */ 644 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 645 final ScanType scanType, final CompactionLifeCycleTracker tracker, 646 final CompactionRequest request, final User user) throws IOException { 647 InternalScanner defaultResult = scanner; 648 if (coprocEnvironments.isEmpty()) { 649 return defaultResult; 650 } 651 return execOperationWithResult( 652 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, 653 defaultResult, user) { 654 @Override 655 public InternalScanner call(RegionObserver observer) throws IOException { 656 InternalScanner scanner = 657 observer.preCompact(this, store, getResult(), scanType, tracker, request); 658 if (scanner == null) { 659 throw new CoprocessorException("Null Scanner return disallowed!"); 660 } 661 return scanner; 662 } 663 }); 664 } 665 666 /** 667 * Called after the store compaction has completed. 668 * @param store the store being compacted 669 * @param resultFile the new store file written during compaction 670 * @param tracker used to track the life cycle of a compaction 671 * @param request the compaction request 672 * @param user the user 673 * @throws IOException 674 */ 675 public void postCompact(final HStore store, final HStoreFile resultFile, 676 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 677 throws IOException { 678 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { 679 @Override 680 public void call(RegionObserver observer) throws IOException { 681 observer.postCompact(this, store, resultFile, tracker, request); 682 } 683 }); 684 } 685 686 /** 687 * Invoked before create StoreScanner for flush. 688 */ 689 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 690 throws IOException { 691 if (coprocEnvironments.isEmpty()) { 692 return store.getScanInfo(); 693 } 694 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 695 execOperation(new RegionObserverOperationWithoutResult() { 696 @Override 697 public void call(RegionObserver observer) throws IOException { 698 observer.preFlushScannerOpen(this, store, builder, tracker); 699 } 700 }); 701 return builder.build(); 702 } 703 704 /** 705 * Invoked before a memstore flush 706 * @return Scanner to use (cannot be null!) 707 * @throws IOException 708 */ 709 public InternalScanner preFlush(HStore store, InternalScanner scanner, 710 FlushLifeCycleTracker tracker) throws IOException { 711 if (coprocEnvironments.isEmpty()) { 712 return scanner; 713 } 714 return execOperationWithResult( 715 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) { 716 @Override 717 public InternalScanner call(RegionObserver observer) throws IOException { 718 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 719 if (scanner == null) { 720 throw new CoprocessorException("Null Scanner return disallowed!"); 721 } 722 return scanner; 723 } 724 }); 725 } 726 727 /** 728 * Invoked before a memstore flush 729 * @throws IOException 730 */ 731 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 732 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 733 @Override 734 public void call(RegionObserver observer) throws IOException { 735 observer.preFlush(this, tracker); 736 } 737 }); 738 } 739 740 /** 741 * Invoked after a memstore flush 742 * @throws IOException 743 */ 744 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 745 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 746 @Override 747 public void call(RegionObserver observer) throws IOException { 748 observer.postFlush(this, tracker); 749 } 750 }); 751 } 752 753 /** 754 * Invoked before in memory compaction. 755 */ 756 public void preMemStoreCompaction(HStore store) throws IOException { 757 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 758 @Override 759 public void call(RegionObserver observer) throws IOException { 760 observer.preMemStoreCompaction(this, store); 761 } 762 }); 763 } 764 765 /** 766 * Invoked before create StoreScanner for in memory compaction. 767 */ 768 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 769 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 770 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 771 @Override 772 public void call(RegionObserver observer) throws IOException { 773 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 774 } 775 }); 776 return builder.build(); 777 } 778 779 /** 780 * Invoked before compacting memstore. 781 */ 782 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 783 throws IOException { 784 if (coprocEnvironments.isEmpty()) { 785 return scanner; 786 } 787 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 788 regionObserverGetter, scanner) { 789 @Override 790 public InternalScanner call(RegionObserver observer) throws IOException { 791 return observer.preMemStoreCompactionCompact(this, store, getResult()); 792 } 793 }); 794 } 795 796 /** 797 * Invoked after in memory compaction. 798 */ 799 public void postMemStoreCompaction(HStore store) throws IOException { 800 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 801 @Override 802 public void call(RegionObserver observer) throws IOException { 803 observer.postMemStoreCompaction(this, store); 804 } 805 }); 806 } 807 808 /** 809 * Invoked after a memstore flush 810 * @throws IOException 811 */ 812 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 813 throws IOException { 814 if (coprocEnvironments.isEmpty()) { 815 return; 816 } 817 execOperation(new RegionObserverOperationWithoutResult() { 818 @Override 819 public void call(RegionObserver observer) throws IOException { 820 observer.postFlush(this, store, storeFile, tracker); 821 } 822 }); 823 } 824 825 // RegionObserver support 826 /** 827 * Supports Coprocessor 'bypass'. 828 * @param get the Get request 829 * @param results What to return if return is true/'bypass'. 830 * @return true if default processing should be bypassed. 831 * @exception IOException Exception 832 */ 833 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 834 if (coprocEnvironments.isEmpty()) { 835 return false; 836 } 837 boolean bypassable = true; 838 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 839 @Override 840 public void call(RegionObserver observer) throws IOException { 841 observer.preGetOp(this, get, results); 842 } 843 }); 844 } 845 846 /** 847 * @param get the Get request 848 * @param results the result set 849 * @exception IOException Exception 850 */ 851 public void postGet(final Get get, final List<Cell> results) 852 throws IOException { 853 if (coprocEnvironments.isEmpty()) { 854 return; 855 } 856 execOperation(new RegionObserverOperationWithoutResult() { 857 @Override 858 public void call(RegionObserver observer) throws IOException { 859 observer.postGetOp(this, get, results); 860 } 861 }); 862 } 863 864 /** 865 * Supports Coprocessor 'bypass'. 866 * @param get the Get request 867 * @return true or false to return to client if bypassing normal operation, or null otherwise 868 * @exception IOException Exception 869 */ 870 public Boolean preExists(final Get get) throws IOException { 871 boolean bypassable = true; 872 boolean defaultResult = false; 873 if (coprocEnvironments.isEmpty()) { 874 return null; 875 } 876 return execOperationWithResult( 877 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 878 defaultResult, bypassable) { 879 @Override 880 public Boolean call(RegionObserver observer) throws IOException { 881 return observer.preExists(this, get, getResult()); 882 } 883 }); 884 } 885 886 /** 887 * @param get the Get request 888 * @param result the result returned by the region server 889 * @return the result to return to the client 890 * @exception IOException Exception 891 */ 892 public boolean postExists(final Get get, boolean result) 893 throws IOException { 894 if (this.coprocEnvironments.isEmpty()) { 895 return result; 896 } 897 return execOperationWithResult( 898 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 899 @Override 900 public Boolean call(RegionObserver observer) throws IOException { 901 return observer.postExists(this, get, getResult()); 902 } 903 }); 904 } 905 906 /** 907 * Supports Coprocessor 'bypass'. 908 * @param put The Put object 909 * @param edit The WALEdit object. 910 * @param durability The durability used 911 * @return true if default processing should be bypassed 912 * @exception IOException Exception 913 */ 914 public boolean prePut(final Put put, final WALEdit edit, final Durability durability) 915 throws IOException { 916 if (coprocEnvironments.isEmpty()) { 917 return false; 918 } 919 boolean bypassable = true; 920 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 921 @Override 922 public void call(RegionObserver observer) throws IOException { 923 observer.prePut(this, put, edit, durability); 924 } 925 }); 926 } 927 928 /** 929 * Supports Coprocessor 'bypass'. 930 * @param mutation - the current mutation 931 * @param kv - the current cell 932 * @param byteNow - current timestamp in bytes 933 * @param get - the get that could be used 934 * Note that the get only does not specify the family and qualifier that should be used 935 * @return true if default processing should be bypassed 936 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 937 * Coprocessor for its needs only. Will be removed. 938 */ 939 @Deprecated 940 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, 941 final Cell kv, final byte[] byteNow, final Get get) throws IOException { 942 if (coprocEnvironments.isEmpty()) { 943 return false; 944 } 945 boolean bypassable = true; 946 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 947 @Override 948 public void call(RegionObserver observer) throws IOException { 949 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 950 } 951 }); 952 } 953 954 /** 955 * @param put The Put object 956 * @param edit The WALEdit object. 957 * @param durability The durability used 958 * @exception IOException Exception 959 */ 960 public void postPut(final Put put, final WALEdit edit, final Durability durability) 961 throws IOException { 962 if (coprocEnvironments.isEmpty()) { 963 return; 964 } 965 execOperation(new RegionObserverOperationWithoutResult() { 966 @Override 967 public void call(RegionObserver observer) throws IOException { 968 observer.postPut(this, put, edit, durability); 969 } 970 }); 971 } 972 973 /** 974 * Supports Coprocessor 'bypass'. 975 * @param delete The Delete object 976 * @param edit The WALEdit object. 977 * @param durability The durability used 978 * @return true if default processing should be bypassed 979 * @exception IOException Exception 980 */ 981 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) 982 throws IOException { 983 if (this.coprocEnvironments.isEmpty()) { 984 return false; 985 } 986 boolean bypassable = true; 987 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 988 @Override 989 public void call(RegionObserver observer) throws IOException { 990 observer.preDelete(this, delete, edit, durability); 991 } 992 }); 993 } 994 995 /** 996 * @param delete The Delete object 997 * @param edit The WALEdit object. 998 * @param durability The durability used 999 * @exception IOException Exception 1000 */ 1001 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) 1002 throws IOException { 1003 execOperation(coprocEnvironments.isEmpty()? null: 1004 new RegionObserverOperationWithoutResult() { 1005 @Override 1006 public void call(RegionObserver observer) throws IOException { 1007 observer.postDelete(this, delete, edit, durability); 1008 } 1009 }); 1010 } 1011 1012 public void preBatchMutate( 1013 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1014 if(this.coprocEnvironments.isEmpty()) { 1015 return; 1016 } 1017 execOperation(new RegionObserverOperationWithoutResult() { 1018 @Override 1019 public void call(RegionObserver observer) throws IOException { 1020 observer.preBatchMutate(this, miniBatchOp); 1021 } 1022 }); 1023 } 1024 1025 public void postBatchMutate( 1026 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1027 if (this.coprocEnvironments.isEmpty()) { 1028 return; 1029 } 1030 execOperation(new RegionObserverOperationWithoutResult() { 1031 @Override 1032 public void call(RegionObserver observer) throws IOException { 1033 observer.postBatchMutate(this, miniBatchOp); 1034 } 1035 }); 1036 } 1037 1038 public void postBatchMutateIndispensably( 1039 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) 1040 throws IOException { 1041 if (this.coprocEnvironments.isEmpty()) { 1042 return; 1043 } 1044 execOperation(new RegionObserverOperationWithoutResult() { 1045 @Override 1046 public void call(RegionObserver observer) throws IOException { 1047 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1048 } 1049 }); 1050 } 1051 1052 /** 1053 * Supports Coprocessor 'bypass'. 1054 * @param row row to check 1055 * @param family column family 1056 * @param qualifier column qualifier 1057 * @param op the comparison operation 1058 * @param comparator the comparator 1059 * @param put data to put if check succeeds 1060 * @return true or false to return to client if default processing should be bypassed, or null 1061 * otherwise 1062 */ 1063 public Boolean preCheckAndPut(final byte [] row, final byte [] family, 1064 final byte [] qualifier, final CompareOperator op, 1065 final ByteArrayComparable comparator, final Put put) 1066 throws IOException { 1067 boolean bypassable = true; 1068 boolean defaultResult = false; 1069 if (coprocEnvironments.isEmpty()) { 1070 return null; 1071 } 1072 return execOperationWithResult( 1073 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1074 defaultResult, bypassable) { 1075 @Override 1076 public Boolean call(RegionObserver observer) throws IOException { 1077 return observer.preCheckAndPut(this, row, family, qualifier, 1078 op, comparator, put, getResult()); 1079 } 1080 }); 1081 } 1082 1083 /** 1084 * Supports Coprocessor 'bypass'. 1085 * @param row row to check 1086 * @param family column family 1087 * @param qualifier column qualifier 1088 * @param op the comparison operation 1089 * @param comparator the comparator 1090 * @param put data to put if check succeeds 1091 * @return true or false to return to client if default processing should be bypassed, or null 1092 * otherwise 1093 */ 1094 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1095 justification="Null is legit") 1096 public Boolean preCheckAndPutAfterRowLock( 1097 final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, 1098 final ByteArrayComparable comparator, final Put put) throws IOException { 1099 boolean bypassable = true; 1100 boolean defaultResult = false; 1101 if (coprocEnvironments.isEmpty()) { 1102 return null; 1103 } 1104 return execOperationWithResult( 1105 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1106 defaultResult, bypassable) { 1107 @Override 1108 public Boolean call(RegionObserver observer) throws IOException { 1109 return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, 1110 op, comparator, put, getResult()); 1111 } 1112 }); 1113 } 1114 1115 /** 1116 * @param row row to check 1117 * @param family column family 1118 * @param qualifier column qualifier 1119 * @param op the comparison operation 1120 * @param comparator the comparator 1121 * @param put data to put if check succeeds 1122 * @throws IOException e 1123 */ 1124 public boolean postCheckAndPut(final byte [] row, final byte [] family, 1125 final byte [] qualifier, final CompareOperator op, 1126 final ByteArrayComparable comparator, final Put put, 1127 boolean result) throws IOException { 1128 if (this.coprocEnvironments.isEmpty()) { 1129 return result; 1130 } 1131 return execOperationWithResult( 1132 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1133 @Override 1134 public Boolean call(RegionObserver observer) throws IOException { 1135 return observer.postCheckAndPut(this, row, family, qualifier, 1136 op, comparator, put, getResult()); 1137 } 1138 }); 1139 } 1140 1141 /** 1142 * Supports Coprocessor 'bypass'. 1143 * @param row row to check 1144 * @param family column family 1145 * @param qualifier column qualifier 1146 * @param op the comparison operation 1147 * @param comparator the comparator 1148 * @param delete delete to commit if check succeeds 1149 * @return true or false to return to client if default processing should be bypassed, 1150 * or null otherwise 1151 */ 1152 public Boolean preCheckAndDelete(final byte [] row, final byte [] family, 1153 final byte [] qualifier, final CompareOperator op, 1154 final ByteArrayComparable comparator, final Delete delete) 1155 throws IOException { 1156 boolean bypassable = true; 1157 boolean defaultResult = false; 1158 if (coprocEnvironments.isEmpty()) { 1159 return null; 1160 } 1161 return execOperationWithResult( 1162 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1163 defaultResult, bypassable) { 1164 @Override 1165 public Boolean call(RegionObserver observer) throws IOException { 1166 return observer.preCheckAndDelete(this, row, family, 1167 qualifier, op, comparator, delete, getResult()); 1168 } 1169 }); 1170 } 1171 1172 /** 1173 * Supports Coprocessor 'bypass'. 1174 * @param row row to check 1175 * @param family column family 1176 * @param qualifier column qualifier 1177 * @param op the comparison operation 1178 * @param comparator the comparator 1179 * @param delete delete to commit if check succeeds 1180 * @return true or false to return to client if default processing should be bypassed, 1181 * or null otherwise 1182 */ 1183 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1184 justification="Null is legit") 1185 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, 1186 final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, 1187 final Delete delete) throws IOException { 1188 boolean bypassable = true; 1189 boolean defaultResult = false; 1190 if (coprocEnvironments.isEmpty()) { 1191 return null; 1192 } 1193 return execOperationWithResult( 1194 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1195 defaultResult, bypassable) { 1196 @Override 1197 public Boolean call(RegionObserver observer) throws IOException { 1198 return observer.preCheckAndDeleteAfterRowLock(this, row, 1199 family, qualifier, op, comparator, delete, getResult()); 1200 } 1201 }); 1202 } 1203 1204 /** 1205 * @param row row to check 1206 * @param family column family 1207 * @param qualifier column qualifier 1208 * @param op the comparison operation 1209 * @param comparator the comparator 1210 * @param delete delete to commit if check succeeds 1211 * @throws IOException e 1212 */ 1213 public boolean postCheckAndDelete(final byte [] row, final byte [] family, 1214 final byte [] qualifier, final CompareOperator op, 1215 final ByteArrayComparable comparator, final Delete delete, 1216 boolean result) throws IOException { 1217 if (this.coprocEnvironments.isEmpty()) { 1218 return result; 1219 } 1220 return execOperationWithResult( 1221 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1222 @Override 1223 public Boolean call(RegionObserver observer) throws IOException { 1224 return observer.postCheckAndDelete(this, row, family, 1225 qualifier, op, comparator, delete, getResult()); 1226 } 1227 }); 1228 } 1229 1230 /** 1231 * Supports Coprocessor 'bypass'. 1232 * @param append append object 1233 * @return result to return to client if default operation should be bypassed, null otherwise 1234 * @throws IOException if an error occurred on the coprocessor 1235 */ 1236 public Result preAppend(final Append append) throws IOException { 1237 boolean bypassable = true; 1238 Result defaultResult = null; 1239 if (this.coprocEnvironments.isEmpty()) { 1240 return defaultResult; 1241 } 1242 return execOperationWithResult( 1243 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1244 bypassable) { 1245 @Override 1246 public Result call(RegionObserver observer) throws IOException { 1247 return observer.preAppend(this, append); 1248 } 1249 }); 1250 } 1251 1252 /** 1253 * Supports Coprocessor 'bypass'. 1254 * @param append append object 1255 * @return result to return to client if default operation should be bypassed, null otherwise 1256 * @throws IOException if an error occurred on the coprocessor 1257 */ 1258 public Result preAppendAfterRowLock(final Append append) throws IOException { 1259 boolean bypassable = true; 1260 Result defaultResult = null; 1261 if (this.coprocEnvironments.isEmpty()) { 1262 return defaultResult; 1263 } 1264 return execOperationWithResult( 1265 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, 1266 defaultResult, bypassable) { 1267 @Override 1268 public Result call(RegionObserver observer) throws IOException { 1269 return observer.preAppendAfterRowLock(this, append); 1270 } 1271 }); 1272 } 1273 1274 /** 1275 * Supports Coprocessor 'bypass'. 1276 * @param increment increment object 1277 * @return result to return to client if default operation should be bypassed, null otherwise 1278 * @throws IOException if an error occurred on the coprocessor 1279 */ 1280 public Result preIncrement(final Increment increment) throws IOException { 1281 boolean bypassable = true; 1282 Result defaultResult = null; 1283 if (coprocEnvironments.isEmpty()) { 1284 return defaultResult; 1285 } 1286 return execOperationWithResult( 1287 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1288 bypassable) { 1289 @Override 1290 public Result call(RegionObserver observer) throws IOException { 1291 return observer.preIncrement(this, increment); 1292 } 1293 }); 1294 } 1295 1296 /** 1297 * Supports Coprocessor 'bypass'. 1298 * @param increment increment object 1299 * @return result to return to client if default operation should be bypassed, null otherwise 1300 * @throws IOException if an error occurred on the coprocessor 1301 */ 1302 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1303 boolean bypassable = true; 1304 Result defaultResult = null; 1305 if (coprocEnvironments.isEmpty()) { 1306 return defaultResult; 1307 } 1308 return execOperationWithResult( 1309 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1310 bypassable) { 1311 @Override 1312 public Result call(RegionObserver observer) throws IOException { 1313 return observer.preIncrementAfterRowLock(this, increment); 1314 } 1315 }); 1316 } 1317 1318 /** 1319 * @param append Append object 1320 * @param result the result returned by the append 1321 * @throws IOException if an error occurred on the coprocessor 1322 */ 1323 public Result postAppend(final Append append, final Result result) throws IOException { 1324 if (this.coprocEnvironments.isEmpty()) { 1325 return result; 1326 } 1327 return execOperationWithResult( 1328 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1329 @Override 1330 public Result call(RegionObserver observer) throws IOException { 1331 return observer.postAppend(this, append, result); 1332 } 1333 }); 1334 } 1335 1336 /** 1337 * @param increment increment object 1338 * @param result the result returned by postIncrement 1339 * @throws IOException if an error occurred on the coprocessor 1340 */ 1341 public Result postIncrement(final Increment increment, Result result) throws IOException { 1342 if (this.coprocEnvironments.isEmpty()) { 1343 return result; 1344 } 1345 return execOperationWithResult( 1346 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1347 @Override 1348 public Result call(RegionObserver observer) throws IOException { 1349 return observer.postIncrement(this, increment, getResult()); 1350 } 1351 }); 1352 } 1353 1354 /** 1355 * @param scan the Scan specification 1356 * @exception IOException Exception 1357 */ 1358 public void preScannerOpen(final Scan scan) throws IOException { 1359 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1360 @Override 1361 public void call(RegionObserver observer) throws IOException { 1362 observer.preScannerOpen(this, scan); 1363 } 1364 }); 1365 } 1366 1367 /** 1368 * @param scan the Scan specification 1369 * @param s the scanner 1370 * @return the scanner instance to use 1371 * @exception IOException Exception 1372 */ 1373 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1374 if (this.coprocEnvironments.isEmpty()) { 1375 return s; 1376 } 1377 return execOperationWithResult( 1378 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1379 @Override 1380 public RegionScanner call(RegionObserver observer) throws IOException { 1381 return observer.postScannerOpen(this, scan, getResult()); 1382 } 1383 }); 1384 } 1385 1386 /** 1387 * @param s the scanner 1388 * @param results the result set returned by the region server 1389 * @param limit the maximum number of results to return 1390 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1391 * @exception IOException Exception 1392 */ 1393 public Boolean preScannerNext(final InternalScanner s, 1394 final List<Result> results, final int limit) throws IOException { 1395 boolean bypassable = true; 1396 boolean defaultResult = false; 1397 if (coprocEnvironments.isEmpty()) { 1398 return null; 1399 } 1400 return execOperationWithResult( 1401 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1402 defaultResult, bypassable) { 1403 @Override 1404 public Boolean call(RegionObserver observer) throws IOException { 1405 return observer.preScannerNext(this, s, results, limit, getResult()); 1406 } 1407 }); 1408 } 1409 1410 /** 1411 * @param s the scanner 1412 * @param results the result set returned by the region server 1413 * @param limit the maximum number of results to return 1414 * @param hasMore 1415 * @return 'has more' indication to give to client 1416 * @exception IOException Exception 1417 */ 1418 public boolean postScannerNext(final InternalScanner s, 1419 final List<Result> results, final int limit, boolean hasMore) 1420 throws IOException { 1421 if (this.coprocEnvironments.isEmpty()) { 1422 return hasMore; 1423 } 1424 return execOperationWithResult( 1425 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1426 @Override 1427 public Boolean call(RegionObserver observer) throws IOException { 1428 return observer.postScannerNext(this, s, results, limit, getResult()); 1429 } 1430 }); 1431 } 1432 1433 /** 1434 * This will be called by the scan flow when the current scanned row is being filtered out by the 1435 * filter. 1436 * @param s the scanner 1437 * @param curRowCell The cell in the current row which got filtered out 1438 * @return whether more rows are available for the scanner or not 1439 * @throws IOException 1440 */ 1441 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1442 throws IOException { 1443 // short circuit for performance 1444 boolean defaultResult = true; 1445 if (!hasCustomPostScannerFilterRow) { 1446 return defaultResult; 1447 } 1448 if (this.coprocEnvironments.isEmpty()) { 1449 return defaultResult; 1450 } 1451 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1452 regionObserverGetter, defaultResult) { 1453 @Override 1454 public Boolean call(RegionObserver observer) throws IOException { 1455 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1456 } 1457 }); 1458 } 1459 1460 /** 1461 * Supports Coprocessor 'bypass'. 1462 * @param s the scanner 1463 * @return true if default behavior should be bypassed, false otherwise 1464 * @exception IOException Exception 1465 */ 1466 // Should this be bypassable? 1467 public boolean preScannerClose(final InternalScanner s) throws IOException { 1468 return execOperation(coprocEnvironments.isEmpty()? null: 1469 new RegionObserverOperationWithoutResult(true) { 1470 @Override 1471 public void call(RegionObserver observer) throws IOException { 1472 observer.preScannerClose(this, s); 1473 } 1474 }); 1475 } 1476 1477 /** 1478 * @exception IOException Exception 1479 */ 1480 public void postScannerClose(final InternalScanner s) throws IOException { 1481 execOperation(coprocEnvironments.isEmpty()? null: 1482 new RegionObserverOperationWithoutResult() { 1483 @Override 1484 public void call(RegionObserver observer) throws IOException { 1485 observer.postScannerClose(this, s); 1486 } 1487 }); 1488 } 1489 1490 /** 1491 * Called before open store scanner for user scan. 1492 */ 1493 public ScanInfo preStoreScannerOpen(HStore store) throws IOException { 1494 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1495 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 1496 execOperation(new RegionObserverOperationWithoutResult() { 1497 @Override 1498 public void call(RegionObserver observer) throws IOException { 1499 observer.preStoreScannerOpen(this, store, builder); 1500 } 1501 }); 1502 return builder.build(); 1503 } 1504 1505 /** 1506 * @param info the RegionInfo for this region 1507 * @param edits the file of recovered edits 1508 */ 1509 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1510 execOperation(coprocEnvironments.isEmpty()? null: 1511 new RegionObserverOperationWithoutResult(true) { 1512 @Override 1513 public void call(RegionObserver observer) throws IOException { 1514 observer.preReplayWALs(this, info, edits); 1515 } 1516 }); 1517 } 1518 1519 /** 1520 * @param info the RegionInfo for this region 1521 * @param edits the file of recovered edits 1522 * @throws IOException Exception 1523 */ 1524 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1525 execOperation(coprocEnvironments.isEmpty()? null: 1526 new RegionObserverOperationWithoutResult() { 1527 @Override 1528 public void call(RegionObserver observer) throws IOException { 1529 observer.postReplayWALs(this, info, edits); 1530 } 1531 }); 1532 } 1533 1534 /** 1535 * Supports Coprocessor 'bypass'. 1536 * @return true if default behavior should be bypassed, false otherwise 1537 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1538 * with something that doesn't expose IntefaceAudience.Private classes. 1539 */ 1540 @Deprecated 1541 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1542 throws IOException { 1543 return execOperation(coprocEnvironments.isEmpty()? null: 1544 new RegionObserverOperationWithoutResult(true) { 1545 @Override 1546 public void call(RegionObserver observer) throws IOException { 1547 observer.preWALRestore(this, info, logKey, logEdit); 1548 } 1549 }); 1550 } 1551 1552 /** 1553 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1554 * with something that doesn't expose IntefaceAudience.Private classes. 1555 */ 1556 @Deprecated 1557 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1558 throws IOException { 1559 execOperation(coprocEnvironments.isEmpty()? null: 1560 new RegionObserverOperationWithoutResult() { 1561 @Override 1562 public void call(RegionObserver observer) throws IOException { 1563 observer.postWALRestore(this, info, logKey, logEdit); 1564 } 1565 }); 1566 } 1567 1568 /** 1569 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1570 */ 1571 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1572 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1573 @Override 1574 public void call(RegionObserver observer) throws IOException { 1575 observer.preBulkLoadHFile(this, familyPaths); 1576 } 1577 }); 1578 } 1579 1580 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1581 throws IOException { 1582 return execOperation(coprocEnvironments.isEmpty()? null: 1583 new RegionObserverOperationWithoutResult() { 1584 @Override 1585 public void call(RegionObserver observer) throws IOException { 1586 observer.preCommitStoreFile(this, family, pairs); 1587 } 1588 }); 1589 } 1590 1591 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { 1592 execOperation(coprocEnvironments.isEmpty()? null: 1593 new RegionObserverOperationWithoutResult() { 1594 @Override 1595 public void call(RegionObserver observer) throws IOException { 1596 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1597 } 1598 }); 1599 } 1600 1601 /** 1602 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1603 * @param map Map of CF to List of file paths for the final loaded files 1604 * @throws IOException 1605 */ 1606 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1607 Map<byte[], List<Path>> map) throws IOException { 1608 if (this.coprocEnvironments.isEmpty()) { 1609 return; 1610 } 1611 execOperation(coprocEnvironments.isEmpty()? null: 1612 new RegionObserverOperationWithoutResult() { 1613 @Override 1614 public void call(RegionObserver observer) throws IOException { 1615 observer.postBulkLoadHFile(this, familyPaths, map); 1616 } 1617 }); 1618 } 1619 1620 public void postStartRegionOperation(final Operation op) throws IOException { 1621 execOperation(coprocEnvironments.isEmpty()? null: 1622 new RegionObserverOperationWithoutResult() { 1623 @Override 1624 public void call(RegionObserver observer) throws IOException { 1625 observer.postStartRegionOperation(this, op); 1626 } 1627 }); 1628 } 1629 1630 public void postCloseRegionOperation(final Operation op) throws IOException { 1631 execOperation(coprocEnvironments.isEmpty()? null: 1632 new RegionObserverOperationWithoutResult() { 1633 @Override 1634 public void call(RegionObserver observer) throws IOException { 1635 observer.postCloseRegionOperation(this, op); 1636 } 1637 }); 1638 } 1639 1640 /** 1641 * @param fs fileystem to read from 1642 * @param p path to the file 1643 * @param in {@link FSDataInputStreamWrapper} 1644 * @param size Full size of the file 1645 * @param cacheConf 1646 * @param r original reference file. This will be not null only when reading a split file. 1647 * @return a Reader instance to use instead of the base reader if overriding 1648 * default behavior, null otherwise 1649 * @throws IOException 1650 */ 1651 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1652 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1653 final Reference r) throws IOException { 1654 if (coprocEnvironments.isEmpty()) { 1655 return null; 1656 } 1657 return execOperationWithResult( 1658 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1659 @Override 1660 public StoreFileReader call(RegionObserver observer) throws IOException { 1661 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1662 getResult()); 1663 } 1664 }); 1665 } 1666 1667 /** 1668 * @param fs fileystem to read from 1669 * @param p path to the file 1670 * @param in {@link FSDataInputStreamWrapper} 1671 * @param size Full size of the file 1672 * @param cacheConf 1673 * @param r original reference file. This will be not null only when reading a split file. 1674 * @param reader the base reader instance 1675 * @return The reader to use 1676 * @throws IOException 1677 */ 1678 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1679 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1680 final Reference r, final StoreFileReader reader) throws IOException { 1681 if (this.coprocEnvironments.isEmpty()) { 1682 return reader; 1683 } 1684 return execOperationWithResult( 1685 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) { 1686 @Override 1687 public StoreFileReader call(RegionObserver observer) throws IOException { 1688 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1689 getResult()); 1690 } 1691 }); 1692 } 1693 1694 public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, 1695 final Cell oldCell, Cell newCell) throws IOException { 1696 if (this.coprocEnvironments.isEmpty()) { 1697 return newCell; 1698 } 1699 return execOperationWithResult( 1700 new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) { 1701 @Override 1702 public Cell call(RegionObserver observer) throws IOException { 1703 return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); 1704 } 1705 }); 1706 } 1707 1708 public Message preEndpointInvocation(final Service service, final String methodName, 1709 Message request) throws IOException { 1710 if (coprocEnvironments.isEmpty()) { 1711 return request; 1712 } 1713 return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver, 1714 Message>(endpointObserverGetter, request) { 1715 @Override 1716 public Message call(EndpointObserver observer) throws IOException { 1717 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1718 } 1719 }); 1720 } 1721 1722 public void postEndpointInvocation(final Service service, final String methodName, 1723 final Message request, final Message.Builder responseBuilder) throws IOException { 1724 execOperation(coprocEnvironments.isEmpty() ? null : 1725 new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1726 @Override 1727 public void call(EndpointObserver observer) throws IOException { 1728 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1729 } 1730 }); 1731 } 1732 1733 /** 1734 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1735 */ 1736 @Deprecated 1737 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1738 if (this.coprocEnvironments.isEmpty()) { 1739 return result; 1740 } 1741 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>( 1742 regionObserverGetter, result) { 1743 @Override 1744 public DeleteTracker call(RegionObserver observer) throws IOException { 1745 return observer.postInstantiateDeleteTracker(this, getResult()); 1746 } 1747 }); 1748 } 1749 1750 ///////////////////////////////////////////////////////////////////////////////////////////////// 1751 // BulkLoadObserver hooks 1752 ///////////////////////////////////////////////////////////////////////////////////////////////// 1753 public void prePrepareBulkLoad(User user) throws IOException { 1754 execOperation(coprocEnvironments.isEmpty() ? null : 1755 new BulkLoadObserverOperation(user) { 1756 @Override protected void call(BulkLoadObserver observer) throws IOException { 1757 observer.prePrepareBulkLoad(this); 1758 } 1759 }); 1760 } 1761 1762 public void preCleanupBulkLoad(User user) throws IOException { 1763 execOperation(coprocEnvironments.isEmpty() ? null : 1764 new BulkLoadObserverOperation(user) { 1765 @Override protected void call(BulkLoadObserver observer) throws IOException { 1766 observer.preCleanupBulkLoad(this); 1767 } 1768 }); 1769 } 1770}