001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import com.google.protobuf.Message; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.RawCellBuilder; 041import org.apache.hadoop.hbase.RawCellBuilderFactory; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.SharedConnection; 044import org.apache.hadoop.hbase.client.Append; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.Delete; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.Increment; 050import org.apache.hadoop.hbase.client.Mutation; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 057import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorService; 061import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 062import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 063import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 064import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 065import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 066import org.apache.hadoop.hbase.coprocessor.ObserverContext; 067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 069import org.apache.hadoop.hbase.coprocessor.RegionObserver; 070import org.apache.hadoop.hbase.filter.ByteArrayComparable; 071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 072import org.apache.hadoop.hbase.io.Reference; 073import org.apache.hadoop.hbase.io.hfile.CacheConfig; 074import org.apache.hadoop.hbase.metrics.MetricRegistry; 075import org.apache.hadoop.hbase.regionserver.Region.Operation; 076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 079import org.apache.hadoop.hbase.security.User; 080import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WALEdit; 083import org.apache.hadoop.hbase.wal.WALKey; 084import org.apache.yetus.audience.InterfaceAudience; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 090 091/** 092 * Implements the coprocessor environment and runtime support for coprocessors 093 * loaded within a {@link Region}. 094 */ 095@InterfaceAudience.Private 096public class RegionCoprocessorHost 097 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 098 099 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 100 // The shared data map 101 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 102 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 103 AbstractReferenceMap.ReferenceStrength.WEAK); 104 105 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 106 private final boolean hasCustomPostScannerFilterRow; 107 108 /** 109 * 110 * Encapsulation of the environment of each coprocessor 111 */ 112 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 113 implements RegionCoprocessorEnvironment { 114 private Region region; 115 ConcurrentMap<String, Object> sharedData; 116 private final MetricRegistry metricRegistry; 117 private final RegionServerServices services; 118 119 /** 120 * Constructor 121 * @param impl the coprocessor instance 122 * @param priority chaining priority 123 */ 124 public RegionEnvironment(final RegionCoprocessor impl, final int priority, 125 final int seq, final Configuration conf, final Region region, 126 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 127 super(impl, priority, seq, conf); 128 this.region = region; 129 this.sharedData = sharedData; 130 this.services = services; 131 this.metricRegistry = 132 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 133 } 134 135 /** @return the region */ 136 @Override 137 public Region getRegion() { 138 return region; 139 } 140 141 @Override 142 public OnlineRegions getOnlineRegions() { 143 return this.services; 144 } 145 146 @Override 147 public Connection getConnection() { 148 // Mocks may have services as null at test time. 149 return services != null ? new SharedConnection(services.getConnection()) : null; 150 } 151 152 @Override 153 public Connection createConnection(Configuration conf) throws IOException { 154 return services != null ? this.services.createConnection(conf) : null; 155 } 156 157 @Override 158 public ServerName getServerName() { 159 return services != null? services.getServerName(): null; 160 } 161 162 @Override 163 public void shutdown() { 164 super.shutdown(); 165 MetricsCoprocessor.removeRegistry(this.metricRegistry); 166 } 167 168 @Override 169 public ConcurrentMap<String, Object> getSharedData() { 170 return sharedData; 171 } 172 173 @Override 174 public RegionInfo getRegionInfo() { 175 return region.getRegionInfo(); 176 } 177 178 @Override 179 public MetricRegistry getMetricRegistryForRegionServer() { 180 return metricRegistry; 181 } 182 183 @Override 184 public RawCellBuilder getCellBuilder() { 185 // We always do a DEEP_COPY only 186 return RawCellBuilderFactory.create(); 187 } 188 } 189 190 /** 191 * Special version of RegionEnvironment that exposes RegionServerServices for Core 192 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 193 */ 194 private static class RegionEnvironmentForCoreCoprocessors extends 195 RegionEnvironment implements HasRegionServerServices { 196 private final RegionServerServices rsServices; 197 198 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 199 final int seq, final Configuration conf, final Region region, 200 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 201 super(impl, priority, seq, conf, region, services, sharedData); 202 this.rsServices = services; 203 } 204 205 /** 206 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 207 * consumption. 208 */ 209 @Override 210 public RegionServerServices getRegionServerServices() { 211 return this.rsServices; 212 } 213 } 214 215 static class TableCoprocessorAttribute { 216 private Path path; 217 private String className; 218 private int priority; 219 private Configuration conf; 220 221 public TableCoprocessorAttribute(Path path, String className, int priority, 222 Configuration conf) { 223 this.path = path; 224 this.className = className; 225 this.priority = priority; 226 this.conf = conf; 227 } 228 229 public Path getPath() { 230 return path; 231 } 232 233 public String getClassName() { 234 return className; 235 } 236 237 public int getPriority() { 238 return priority; 239 } 240 241 public Configuration getConf() { 242 return conf; 243 } 244 } 245 246 /** The region server services */ 247 RegionServerServices rsServices; 248 /** The region */ 249 HRegion region; 250 251 /** 252 * Constructor 253 * @param region the region 254 * @param rsServices interface to available region server functionality 255 * @param conf the configuration 256 */ 257 public RegionCoprocessorHost(final HRegion region, 258 final RegionServerServices rsServices, final Configuration conf) { 259 super(rsServices); 260 this.conf = conf; 261 this.rsServices = rsServices; 262 this.region = region; 263 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 264 265 // load system default cp's from configuration. 266 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 267 268 // load system default cp's for user tables from configuration. 269 if (!region.getRegionInfo().getTable().isSystemTable()) { 270 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 271 } 272 273 // load Coprocessor From HDFS 274 loadTableCoprocessors(conf); 275 276 // now check whether any coprocessor implements postScannerFilterRow 277 boolean hasCustomPostScannerFilterRow = false; 278 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { 279 if (env.getInstance() instanceof RegionObserver) { 280 Class<?> clazz = env.getInstance().getClass(); 281 for(;;) { 282 if (clazz == null) { 283 // we must have directly implemented RegionObserver 284 hasCustomPostScannerFilterRow = true; 285 break out; 286 } 287 try { 288 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 289 InternalScanner.class, Cell.class, boolean.class); 290 // this coprocessor has a custom version of postScannerFilterRow 291 hasCustomPostScannerFilterRow = true; 292 break out; 293 } catch (NoSuchMethodException ignore) { 294 } 295 // the deprecated signature still exists 296 try { 297 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 298 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 299 // this coprocessor has a custom version of postScannerFilterRow 300 hasCustomPostScannerFilterRow = true; 301 break out; 302 } catch (NoSuchMethodException ignore) { 303 } 304 clazz = clazz.getSuperclass(); 305 } 306 } 307 } 308 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 309 } 310 311 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 312 TableDescriptor htd) { 313 return htd.getCoprocessorDescriptors().stream().map(cp -> { 314 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 315 Configuration ourConf; 316 if (!cp.getProperties().isEmpty()) { 317 // do an explicit deep copy of the passed configuration 318 ourConf = new Configuration(false); 319 HBaseConfiguration.merge(ourConf, conf); 320 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 321 } else { 322 ourConf = conf; 323 } 324 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 325 }).collect(Collectors.toList()); 326 } 327 328 /** 329 * Sanity check the table coprocessor attributes of the supplied schema. Will 330 * throw an exception if there is a problem. 331 * @param conf 332 * @param htd 333 * @throws IOException 334 */ 335 public static void testTableCoprocessorAttrs(final Configuration conf, 336 final TableDescriptor htd) throws IOException { 337 String pathPrefix = UUID.randomUUID().toString(); 338 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { 339 if (attr.getPriority() < 0) { 340 throw new IOException("Priority for coprocessor " + attr.getClassName() + 341 " cannot be less than 0"); 342 } 343 ClassLoader old = Thread.currentThread().getContextClassLoader(); 344 try { 345 ClassLoader cl; 346 if (attr.getPath() != null) { 347 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 348 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 349 } else { 350 cl = CoprocessorHost.class.getClassLoader(); 351 } 352 Thread.currentThread().setContextClassLoader(cl); 353 if (cl instanceof CoprocessorClassLoader) { 354 String[] includedClassPrefixes = null; 355 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 356 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 357 includedClassPrefixes = prefixes.split(";"); 358 } 359 ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes); 360 } else { 361 cl.loadClass(attr.getClassName()); 362 } 363 } catch (ClassNotFoundException e) { 364 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 365 } finally { 366 Thread.currentThread().setContextClassLoader(old); 367 } 368 } 369 } 370 371 void loadTableCoprocessors(final Configuration conf) { 372 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 373 DEFAULT_COPROCESSORS_ENABLED); 374 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 375 DEFAULT_USER_COPROCESSORS_ENABLED); 376 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 377 return; 378 } 379 380 // scan the table attributes for coprocessor load specifications 381 // initialize the coprocessors 382 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 383 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 384 region.getTableDescriptor())) { 385 // Load encompasses classloading and coprocessor initialization 386 try { 387 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), 388 attr.getPriority(), attr.getConf()); 389 if (env == null) { 390 continue; 391 } 392 configured.add(env); 393 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + 394 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 395 } catch (Throwable t) { 396 // Coprocessor failed to load, do we abort on error? 397 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 398 abortServer(attr.getClassName(), t); 399 } else { 400 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 401 } 402 } 403 } 404 // add together to coprocessor set for COW efficiency 405 coprocEnvironments.addAll(configured); 406 } 407 408 @Override 409 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 410 Configuration conf) { 411 // If coprocessor exposes any services, register them. 412 for (Service service : instance.getServices()) { 413 region.registerService(service); 414 } 415 ConcurrentMap<String, Object> classData; 416 // make sure only one thread can add maps 417 synchronized (SHARED_DATA_MAP) { 418 // as long as at least one RegionEnvironment holds on to its classData it will 419 // remain in this map 420 classData = 421 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 422 k -> new ConcurrentHashMap<>()); 423 } 424 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 425 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 426 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, 427 rsServices, classData): 428 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 429 } 430 431 @Override 432 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 433 throws InstantiationException, IllegalAccessException { 434 try { 435 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 436 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 437 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { 438 // For backward compatibility with old CoprocessorService impl which don't extend 439 // RegionCoprocessor. 440 CoprocessorService cs; 441 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); 442 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); 443 } else { 444 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 445 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 446 return null; 447 } 448 } catch (NoSuchMethodException | InvocationTargetException e) { 449 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 450 } 451 } 452 453 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 454 RegionCoprocessor::getRegionObserver; 455 456 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 457 RegionCoprocessor::getEndpointObserver; 458 459 abstract class RegionObserverOperationWithoutResult extends 460 ObserverOperationWithoutResult<RegionObserver> { 461 public RegionObserverOperationWithoutResult() { 462 super(regionObserverGetter); 463 } 464 465 public RegionObserverOperationWithoutResult(User user) { 466 super(regionObserverGetter, user); 467 } 468 469 public RegionObserverOperationWithoutResult(boolean bypassable) { 470 super(regionObserverGetter, null, bypassable); 471 } 472 473 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 474 super(regionObserverGetter, user, bypassable); 475 } 476 } 477 478 abstract class BulkLoadObserverOperation extends 479 ObserverOperationWithoutResult<BulkLoadObserver> { 480 public BulkLoadObserverOperation(User user) { 481 super(RegionCoprocessor::getBulkLoadObserver, user); 482 } 483 } 484 485 486 ////////////////////////////////////////////////////////////////////////////////////////////////// 487 // Observer operations 488 ////////////////////////////////////////////////////////////////////////////////////////////////// 489 490 ////////////////////////////////////////////////////////////////////////////////////////////////// 491 // Observer operations 492 ////////////////////////////////////////////////////////////////////////////////////////////////// 493 494 /** 495 * Invoked before a region open. 496 * 497 * @throws IOException Signals that an I/O exception has occurred. 498 */ 499 public void preOpen() throws IOException { 500 if (coprocEnvironments.isEmpty()) { 501 return; 502 } 503 execOperation(new RegionObserverOperationWithoutResult() { 504 @Override 505 public void call(RegionObserver observer) throws IOException { 506 observer.preOpen(this); 507 } 508 }); 509 } 510 511 512 /** 513 * Invoked after a region open 514 */ 515 public void postOpen() { 516 if (coprocEnvironments.isEmpty()) { 517 return; 518 } 519 try { 520 execOperation(new RegionObserverOperationWithoutResult() { 521 @Override 522 public void call(RegionObserver observer) throws IOException { 523 observer.postOpen(this); 524 } 525 }); 526 } catch (IOException e) { 527 LOG.warn(e.toString(), e); 528 } 529 } 530 531 /** 532 * Invoked before a region is closed 533 * @param abortRequested true if the server is aborting 534 */ 535 public void preClose(final boolean abortRequested) throws IOException { 536 execOperation(new RegionObserverOperationWithoutResult() { 537 @Override 538 public void call(RegionObserver observer) throws IOException { 539 observer.preClose(this, abortRequested); 540 } 541 }); 542 } 543 544 /** 545 * Invoked after a region is closed 546 * @param abortRequested true if the server is aborting 547 */ 548 public void postClose(final boolean abortRequested) { 549 try { 550 execOperation(new RegionObserverOperationWithoutResult() { 551 @Override 552 public void call(RegionObserver observer) throws IOException { 553 observer.postClose(this, abortRequested); 554 } 555 556 @Override 557 public void postEnvCall() { 558 shutdown(this.getEnvironment()); 559 } 560 }); 561 } catch (IOException e) { 562 LOG.warn(e.toString(), e); 563 } 564 } 565 566 /** 567 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 568 * available candidates. 569 * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed 570 * the passed in <code>candidates</code>. 571 * @param store The store where compaction is being requested 572 * @param candidates The currently available store files 573 * @param tracker used to track the life cycle of a compaction 574 * @param user the user 575 * @throws IOException 576 */ 577 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 578 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 579 if (coprocEnvironments.isEmpty()) { 580 return false; 581 } 582 boolean bypassable = true; 583 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 584 @Override 585 public void call(RegionObserver observer) throws IOException { 586 observer.preCompactSelection(this, store, candidates, tracker); 587 } 588 }); 589 } 590 591 /** 592 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 593 * candidates. 594 * @param store The store where compaction is being requested 595 * @param selected The store files selected to compact 596 * @param tracker used to track the life cycle of a compaction 597 * @param request the compaction request 598 * @param user the user 599 */ 600 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 601 final CompactionLifeCycleTracker tracker, final CompactionRequest request, 602 final User user) throws IOException { 603 if (coprocEnvironments.isEmpty()) { 604 return; 605 } 606 execOperation(new RegionObserverOperationWithoutResult(user) { 607 @Override 608 public void call(RegionObserver observer) throws IOException { 609 observer.postCompactSelection(this, store, selected, tracker, request); 610 } 611 }); 612 } 613 614 /** 615 * Called prior to opening store scanner for compaction. 616 */ 617 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 618 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 619 if (coprocEnvironments.isEmpty()) { 620 return store.getScanInfo(); 621 } 622 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 623 execOperation(new RegionObserverOperationWithoutResult(user) { 624 @Override 625 public void call(RegionObserver observer) throws IOException { 626 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 627 } 628 }); 629 return builder.build(); 630 } 631 632 /** 633 * Called prior to rewriting the store files selected for compaction 634 * @param store the store being compacted 635 * @param scanner the scanner used to read store data during compaction 636 * @param scanType type of Scan 637 * @param tracker used to track the life cycle of a compaction 638 * @param request the compaction request 639 * @param user the user 640 * @return Scanner to use (cannot be null!) 641 * @throws IOException 642 */ 643 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 644 final ScanType scanType, final CompactionLifeCycleTracker tracker, 645 final CompactionRequest request, final User user) throws IOException { 646 InternalScanner defaultResult = scanner; 647 if (coprocEnvironments.isEmpty()) { 648 return defaultResult; 649 } 650 return execOperationWithResult( 651 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, 652 defaultResult, user) { 653 @Override 654 public InternalScanner call(RegionObserver observer) throws IOException { 655 InternalScanner scanner = 656 observer.preCompact(this, store, getResult(), scanType, tracker, request); 657 if (scanner == null) { 658 throw new CoprocessorException("Null Scanner return disallowed!"); 659 } 660 return scanner; 661 } 662 }); 663 } 664 665 /** 666 * Called after the store compaction has completed. 667 * @param store the store being compacted 668 * @param resultFile the new store file written during compaction 669 * @param tracker used to track the life cycle of a compaction 670 * @param request the compaction request 671 * @param user the user 672 * @throws IOException 673 */ 674 public void postCompact(final HStore store, final HStoreFile resultFile, 675 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 676 throws IOException { 677 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { 678 @Override 679 public void call(RegionObserver observer) throws IOException { 680 observer.postCompact(this, store, resultFile, tracker, request); 681 } 682 }); 683 } 684 685 /** 686 * Invoked before create StoreScanner for flush. 687 */ 688 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 689 throws IOException { 690 if (coprocEnvironments.isEmpty()) { 691 return store.getScanInfo(); 692 } 693 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 694 execOperation(new RegionObserverOperationWithoutResult() { 695 @Override 696 public void call(RegionObserver observer) throws IOException { 697 observer.preFlushScannerOpen(this, store, builder, tracker); 698 } 699 }); 700 return builder.build(); 701 } 702 703 /** 704 * Invoked before a memstore flush 705 * @return Scanner to use (cannot be null!) 706 * @throws IOException 707 */ 708 public InternalScanner preFlush(HStore store, InternalScanner scanner, 709 FlushLifeCycleTracker tracker) throws IOException { 710 if (coprocEnvironments.isEmpty()) { 711 return scanner; 712 } 713 return execOperationWithResult( 714 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) { 715 @Override 716 public InternalScanner call(RegionObserver observer) throws IOException { 717 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 718 if (scanner == null) { 719 throw new CoprocessorException("Null Scanner return disallowed!"); 720 } 721 return scanner; 722 } 723 }); 724 } 725 726 /** 727 * Invoked before a memstore flush 728 * @throws IOException 729 */ 730 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 731 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 732 @Override 733 public void call(RegionObserver observer) throws IOException { 734 observer.preFlush(this, tracker); 735 } 736 }); 737 } 738 739 /** 740 * Invoked after a memstore flush 741 * @throws IOException 742 */ 743 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 744 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 745 @Override 746 public void call(RegionObserver observer) throws IOException { 747 observer.postFlush(this, tracker); 748 } 749 }); 750 } 751 752 /** 753 * Invoked before in memory compaction. 754 */ 755 public void preMemStoreCompaction(HStore store) throws IOException { 756 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 757 @Override 758 public void call(RegionObserver observer) throws IOException { 759 observer.preMemStoreCompaction(this, store); 760 } 761 }); 762 } 763 764 /** 765 * Invoked before create StoreScanner for in memory compaction. 766 */ 767 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 768 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 769 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 770 @Override 771 public void call(RegionObserver observer) throws IOException { 772 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 773 } 774 }); 775 return builder.build(); 776 } 777 778 /** 779 * Invoked before compacting memstore. 780 */ 781 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 782 throws IOException { 783 if (coprocEnvironments.isEmpty()) { 784 return scanner; 785 } 786 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 787 regionObserverGetter, scanner) { 788 @Override 789 public InternalScanner call(RegionObserver observer) throws IOException { 790 return observer.preMemStoreCompactionCompact(this, store, getResult()); 791 } 792 }); 793 } 794 795 /** 796 * Invoked after in memory compaction. 797 */ 798 public void postMemStoreCompaction(HStore store) throws IOException { 799 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 800 @Override 801 public void call(RegionObserver observer) throws IOException { 802 observer.postMemStoreCompaction(this, store); 803 } 804 }); 805 } 806 807 /** 808 * Invoked after a memstore flush 809 * @throws IOException 810 */ 811 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 812 throws IOException { 813 if (coprocEnvironments.isEmpty()) { 814 return; 815 } 816 execOperation(new RegionObserverOperationWithoutResult() { 817 @Override 818 public void call(RegionObserver observer) throws IOException { 819 observer.postFlush(this, store, storeFile, tracker); 820 } 821 }); 822 } 823 824 // RegionObserver support 825 /** 826 * Supports Coprocessor 'bypass'. 827 * @param get the Get request 828 * @param results What to return if return is true/'bypass'. 829 * @return true if default processing should be bypassed. 830 * @exception IOException Exception 831 */ 832 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 833 if (coprocEnvironments.isEmpty()) { 834 return false; 835 } 836 boolean bypassable = true; 837 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 838 @Override 839 public void call(RegionObserver observer) throws IOException { 840 observer.preGetOp(this, get, results); 841 } 842 }); 843 } 844 845 /** 846 * @param get the Get request 847 * @param results the result set 848 * @exception IOException Exception 849 */ 850 public void postGet(final Get get, final List<Cell> results) 851 throws IOException { 852 if (coprocEnvironments.isEmpty()) { 853 return; 854 } 855 execOperation(new RegionObserverOperationWithoutResult() { 856 @Override 857 public void call(RegionObserver observer) throws IOException { 858 observer.postGetOp(this, get, results); 859 } 860 }); 861 } 862 863 /** 864 * Supports Coprocessor 'bypass'. 865 * @param get the Get request 866 * @return true or false to return to client if bypassing normal operation, or null otherwise 867 * @exception IOException Exception 868 */ 869 public Boolean preExists(final Get get) throws IOException { 870 boolean bypassable = true; 871 boolean defaultResult = false; 872 if (coprocEnvironments.isEmpty()) { 873 return null; 874 } 875 return execOperationWithResult( 876 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 877 defaultResult, bypassable) { 878 @Override 879 public Boolean call(RegionObserver observer) throws IOException { 880 return observer.preExists(this, get, getResult()); 881 } 882 }); 883 } 884 885 /** 886 * @param get the Get request 887 * @param result the result returned by the region server 888 * @return the result to return to the client 889 * @exception IOException Exception 890 */ 891 public boolean postExists(final Get get, boolean result) 892 throws IOException { 893 if (this.coprocEnvironments.isEmpty()) { 894 return result; 895 } 896 return execOperationWithResult( 897 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 898 @Override 899 public Boolean call(RegionObserver observer) throws IOException { 900 return observer.postExists(this, get, getResult()); 901 } 902 }); 903 } 904 905 /** 906 * Supports Coprocessor 'bypass'. 907 * @param put The Put object 908 * @param edit The WALEdit object. 909 * @param durability The durability used 910 * @return true if default processing should be bypassed 911 * @exception IOException Exception 912 */ 913 public boolean prePut(final Put put, final WALEdit edit, final Durability durability) 914 throws IOException { 915 if (coprocEnvironments.isEmpty()) { 916 return false; 917 } 918 boolean bypassable = true; 919 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 920 @Override 921 public void call(RegionObserver observer) throws IOException { 922 observer.prePut(this, put, edit, durability); 923 } 924 }); 925 } 926 927 /** 928 * Supports Coprocessor 'bypass'. 929 * @param mutation - the current mutation 930 * @param kv - the current cell 931 * @param byteNow - current timestamp in bytes 932 * @param get - the get that could be used 933 * Note that the get only does not specify the family and qualifier that should be used 934 * @return true if default processing should be bypassed 935 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 936 * Coprocessor for its needs only. Will be removed. 937 */ 938 @Deprecated 939 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, 940 final Cell kv, final byte[] byteNow, final Get get) throws IOException { 941 if (coprocEnvironments.isEmpty()) { 942 return false; 943 } 944 boolean bypassable = true; 945 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 946 @Override 947 public void call(RegionObserver observer) throws IOException { 948 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 949 } 950 }); 951 } 952 953 /** 954 * @param put The Put object 955 * @param edit The WALEdit object. 956 * @param durability The durability used 957 * @exception IOException Exception 958 */ 959 public void postPut(final Put put, final WALEdit edit, final Durability durability) 960 throws IOException { 961 if (coprocEnvironments.isEmpty()) { 962 return; 963 } 964 execOperation(new RegionObserverOperationWithoutResult() { 965 @Override 966 public void call(RegionObserver observer) throws IOException { 967 observer.postPut(this, put, edit, durability); 968 } 969 }); 970 } 971 972 /** 973 * Supports Coprocessor 'bypass'. 974 * @param delete The Delete object 975 * @param edit The WALEdit object. 976 * @param durability The durability used 977 * @return true if default processing should be bypassed 978 * @exception IOException Exception 979 */ 980 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) 981 throws IOException { 982 if (this.coprocEnvironments.isEmpty()) { 983 return false; 984 } 985 boolean bypassable = true; 986 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 987 @Override 988 public void call(RegionObserver observer) throws IOException { 989 observer.preDelete(this, delete, edit, durability); 990 } 991 }); 992 } 993 994 /** 995 * @param delete The Delete object 996 * @param edit The WALEdit object. 997 * @param durability The durability used 998 * @exception IOException Exception 999 */ 1000 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) 1001 throws IOException { 1002 execOperation(coprocEnvironments.isEmpty()? null: 1003 new RegionObserverOperationWithoutResult() { 1004 @Override 1005 public void call(RegionObserver observer) throws IOException { 1006 observer.postDelete(this, delete, edit, durability); 1007 } 1008 }); 1009 } 1010 1011 public void preBatchMutate( 1012 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1013 if(this.coprocEnvironments.isEmpty()) { 1014 return; 1015 } 1016 execOperation(new RegionObserverOperationWithoutResult() { 1017 @Override 1018 public void call(RegionObserver observer) throws IOException { 1019 observer.preBatchMutate(this, miniBatchOp); 1020 } 1021 }); 1022 } 1023 1024 public void postBatchMutate( 1025 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1026 if (this.coprocEnvironments.isEmpty()) { 1027 return; 1028 } 1029 execOperation(new RegionObserverOperationWithoutResult() { 1030 @Override 1031 public void call(RegionObserver observer) throws IOException { 1032 observer.postBatchMutate(this, miniBatchOp); 1033 } 1034 }); 1035 } 1036 1037 public void postBatchMutateIndispensably( 1038 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) 1039 throws IOException { 1040 if (this.coprocEnvironments.isEmpty()) { 1041 return; 1042 } 1043 execOperation(new RegionObserverOperationWithoutResult() { 1044 @Override 1045 public void call(RegionObserver observer) throws IOException { 1046 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1047 } 1048 }); 1049 } 1050 1051 /** 1052 * Supports Coprocessor 'bypass'. 1053 * @param row row to check 1054 * @param family column family 1055 * @param qualifier column qualifier 1056 * @param op the comparison operation 1057 * @param comparator the comparator 1058 * @param put data to put if check succeeds 1059 * @return true or false to return to client if default processing should be bypassed, or null 1060 * otherwise 1061 */ 1062 public Boolean preCheckAndPut(final byte [] row, final byte [] family, 1063 final byte [] qualifier, final CompareOperator op, 1064 final ByteArrayComparable comparator, final Put put) 1065 throws IOException { 1066 boolean bypassable = true; 1067 boolean defaultResult = false; 1068 if (coprocEnvironments.isEmpty()) { 1069 return null; 1070 } 1071 return execOperationWithResult( 1072 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1073 defaultResult, bypassable) { 1074 @Override 1075 public Boolean call(RegionObserver observer) throws IOException { 1076 return observer.preCheckAndPut(this, row, family, qualifier, 1077 op, comparator, put, getResult()); 1078 } 1079 }); 1080 } 1081 1082 /** 1083 * Supports Coprocessor 'bypass'. 1084 * @param row row to check 1085 * @param family column family 1086 * @param qualifier column qualifier 1087 * @param op the comparison operation 1088 * @param comparator the comparator 1089 * @param put data to put if check succeeds 1090 * @return true or false to return to client if default processing should be bypassed, or null 1091 * otherwise 1092 */ 1093 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1094 justification="Null is legit") 1095 public Boolean preCheckAndPutAfterRowLock( 1096 final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, 1097 final ByteArrayComparable comparator, final Put put) throws IOException { 1098 boolean bypassable = true; 1099 boolean defaultResult = false; 1100 if (coprocEnvironments.isEmpty()) { 1101 return null; 1102 } 1103 return execOperationWithResult( 1104 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1105 defaultResult, bypassable) { 1106 @Override 1107 public Boolean call(RegionObserver observer) throws IOException { 1108 return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, 1109 op, comparator, put, getResult()); 1110 } 1111 }); 1112 } 1113 1114 /** 1115 * @param row row to check 1116 * @param family column family 1117 * @param qualifier column qualifier 1118 * @param op the comparison operation 1119 * @param comparator the comparator 1120 * @param put data to put if check succeeds 1121 * @throws IOException e 1122 */ 1123 public boolean postCheckAndPut(final byte [] row, final byte [] family, 1124 final byte [] qualifier, final CompareOperator op, 1125 final ByteArrayComparable comparator, final Put put, 1126 boolean result) throws IOException { 1127 if (this.coprocEnvironments.isEmpty()) { 1128 return result; 1129 } 1130 return execOperationWithResult( 1131 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1132 @Override 1133 public Boolean call(RegionObserver observer) throws IOException { 1134 return observer.postCheckAndPut(this, row, family, qualifier, 1135 op, comparator, put, getResult()); 1136 } 1137 }); 1138 } 1139 1140 /** 1141 * Supports Coprocessor 'bypass'. 1142 * @param row row to check 1143 * @param family column family 1144 * @param qualifier column qualifier 1145 * @param op the comparison operation 1146 * @param comparator the comparator 1147 * @param delete delete to commit if check succeeds 1148 * @return true or false to return to client if default processing should be bypassed, 1149 * or null otherwise 1150 */ 1151 public Boolean preCheckAndDelete(final byte [] row, final byte [] family, 1152 final byte [] qualifier, final CompareOperator op, 1153 final ByteArrayComparable comparator, final Delete delete) 1154 throws IOException { 1155 boolean bypassable = true; 1156 boolean defaultResult = false; 1157 if (coprocEnvironments.isEmpty()) { 1158 return null; 1159 } 1160 return execOperationWithResult( 1161 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1162 defaultResult, bypassable) { 1163 @Override 1164 public Boolean call(RegionObserver observer) throws IOException { 1165 return observer.preCheckAndDelete(this, row, family, 1166 qualifier, op, comparator, delete, getResult()); 1167 } 1168 }); 1169 } 1170 1171 /** 1172 * Supports Coprocessor 'bypass'. 1173 * @param row row to check 1174 * @param family column family 1175 * @param qualifier column qualifier 1176 * @param op the comparison operation 1177 * @param comparator the comparator 1178 * @param delete delete to commit if check succeeds 1179 * @return true or false to return to client if default processing should be bypassed, 1180 * or null otherwise 1181 */ 1182 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1183 justification="Null is legit") 1184 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, 1185 final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, 1186 final Delete delete) throws IOException { 1187 boolean bypassable = true; 1188 boolean defaultResult = false; 1189 if (coprocEnvironments.isEmpty()) { 1190 return null; 1191 } 1192 return execOperationWithResult( 1193 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1194 defaultResult, bypassable) { 1195 @Override 1196 public Boolean call(RegionObserver observer) throws IOException { 1197 return observer.preCheckAndDeleteAfterRowLock(this, row, 1198 family, qualifier, op, comparator, delete, getResult()); 1199 } 1200 }); 1201 } 1202 1203 /** 1204 * @param row row to check 1205 * @param family column family 1206 * @param qualifier column qualifier 1207 * @param op the comparison operation 1208 * @param comparator the comparator 1209 * @param delete delete to commit if check succeeds 1210 * @throws IOException e 1211 */ 1212 public boolean postCheckAndDelete(final byte [] row, final byte [] family, 1213 final byte [] qualifier, final CompareOperator op, 1214 final ByteArrayComparable comparator, final Delete delete, 1215 boolean result) throws IOException { 1216 if (this.coprocEnvironments.isEmpty()) { 1217 return result; 1218 } 1219 return execOperationWithResult( 1220 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1221 @Override 1222 public Boolean call(RegionObserver observer) throws IOException { 1223 return observer.postCheckAndDelete(this, row, family, 1224 qualifier, op, comparator, delete, getResult()); 1225 } 1226 }); 1227 } 1228 1229 /** 1230 * Supports Coprocessor 'bypass'. 1231 * @param append append object 1232 * @return result to return to client if default operation should be bypassed, null otherwise 1233 * @throws IOException if an error occurred on the coprocessor 1234 */ 1235 public Result preAppend(final Append append) throws IOException { 1236 boolean bypassable = true; 1237 Result defaultResult = null; 1238 if (this.coprocEnvironments.isEmpty()) { 1239 return defaultResult; 1240 } 1241 return execOperationWithResult( 1242 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1243 bypassable) { 1244 @Override 1245 public Result call(RegionObserver observer) throws IOException { 1246 return observer.preAppend(this, append); 1247 } 1248 }); 1249 } 1250 1251 /** 1252 * Supports Coprocessor 'bypass'. 1253 * @param append append object 1254 * @return result to return to client if default operation should be bypassed, null otherwise 1255 * @throws IOException if an error occurred on the coprocessor 1256 */ 1257 public Result preAppendAfterRowLock(final Append append) throws IOException { 1258 boolean bypassable = true; 1259 Result defaultResult = null; 1260 if (this.coprocEnvironments.isEmpty()) { 1261 return defaultResult; 1262 } 1263 return execOperationWithResult( 1264 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, 1265 defaultResult, bypassable) { 1266 @Override 1267 public Result call(RegionObserver observer) throws IOException { 1268 return observer.preAppendAfterRowLock(this, append); 1269 } 1270 }); 1271 } 1272 1273 /** 1274 * Supports Coprocessor 'bypass'. 1275 * @param increment increment object 1276 * @return result to return to client if default operation should be bypassed, null otherwise 1277 * @throws IOException if an error occurred on the coprocessor 1278 */ 1279 public Result preIncrement(final Increment increment) throws IOException { 1280 boolean bypassable = true; 1281 Result defaultResult = null; 1282 if (coprocEnvironments.isEmpty()) { 1283 return defaultResult; 1284 } 1285 return execOperationWithResult( 1286 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1287 bypassable) { 1288 @Override 1289 public Result call(RegionObserver observer) throws IOException { 1290 return observer.preIncrement(this, increment); 1291 } 1292 }); 1293 } 1294 1295 /** 1296 * Supports Coprocessor 'bypass'. 1297 * @param increment increment object 1298 * @return result to return to client if default operation should be bypassed, null otherwise 1299 * @throws IOException if an error occurred on the coprocessor 1300 */ 1301 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1302 boolean bypassable = true; 1303 Result defaultResult = null; 1304 if (coprocEnvironments.isEmpty()) { 1305 return defaultResult; 1306 } 1307 return execOperationWithResult( 1308 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1309 bypassable) { 1310 @Override 1311 public Result call(RegionObserver observer) throws IOException { 1312 return observer.preIncrementAfterRowLock(this, increment); 1313 } 1314 }); 1315 } 1316 1317 /** 1318 * @param append Append object 1319 * @param result the result returned by the append 1320 * @throws IOException if an error occurred on the coprocessor 1321 */ 1322 public Result postAppend(final Append append, final Result result) throws IOException { 1323 if (this.coprocEnvironments.isEmpty()) { 1324 return result; 1325 } 1326 return execOperationWithResult( 1327 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1328 @Override 1329 public Result call(RegionObserver observer) throws IOException { 1330 return observer.postAppend(this, append, result); 1331 } 1332 }); 1333 } 1334 1335 /** 1336 * @param increment increment object 1337 * @param result the result returned by postIncrement 1338 * @throws IOException if an error occurred on the coprocessor 1339 */ 1340 public Result postIncrement(final Increment increment, Result result) throws IOException { 1341 if (this.coprocEnvironments.isEmpty()) { 1342 return result; 1343 } 1344 return execOperationWithResult( 1345 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1346 @Override 1347 public Result call(RegionObserver observer) throws IOException { 1348 return observer.postIncrement(this, increment, getResult()); 1349 } 1350 }); 1351 } 1352 1353 /** 1354 * @param scan the Scan specification 1355 * @exception IOException Exception 1356 */ 1357 public void preScannerOpen(final Scan scan) throws IOException { 1358 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1359 @Override 1360 public void call(RegionObserver observer) throws IOException { 1361 observer.preScannerOpen(this, scan); 1362 } 1363 }); 1364 } 1365 1366 /** 1367 * @param scan the Scan specification 1368 * @param s the scanner 1369 * @return the scanner instance to use 1370 * @exception IOException Exception 1371 */ 1372 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1373 if (this.coprocEnvironments.isEmpty()) { 1374 return s; 1375 } 1376 return execOperationWithResult( 1377 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1378 @Override 1379 public RegionScanner call(RegionObserver observer) throws IOException { 1380 return observer.postScannerOpen(this, scan, getResult()); 1381 } 1382 }); 1383 } 1384 1385 /** 1386 * @param s the scanner 1387 * @param results the result set returned by the region server 1388 * @param limit the maximum number of results to return 1389 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1390 * @exception IOException Exception 1391 */ 1392 public Boolean preScannerNext(final InternalScanner s, 1393 final List<Result> results, final int limit) throws IOException { 1394 boolean bypassable = true; 1395 boolean defaultResult = false; 1396 if (coprocEnvironments.isEmpty()) { 1397 return null; 1398 } 1399 return execOperationWithResult( 1400 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1401 defaultResult, bypassable) { 1402 @Override 1403 public Boolean call(RegionObserver observer) throws IOException { 1404 return observer.preScannerNext(this, s, results, limit, getResult()); 1405 } 1406 }); 1407 } 1408 1409 /** 1410 * @param s the scanner 1411 * @param results the result set returned by the region server 1412 * @param limit the maximum number of results to return 1413 * @param hasMore 1414 * @return 'has more' indication to give to client 1415 * @exception IOException Exception 1416 */ 1417 public boolean postScannerNext(final InternalScanner s, 1418 final List<Result> results, final int limit, boolean hasMore) 1419 throws IOException { 1420 if (this.coprocEnvironments.isEmpty()) { 1421 return hasMore; 1422 } 1423 return execOperationWithResult( 1424 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1425 @Override 1426 public Boolean call(RegionObserver observer) throws IOException { 1427 return observer.postScannerNext(this, s, results, limit, getResult()); 1428 } 1429 }); 1430 } 1431 1432 /** 1433 * This will be called by the scan flow when the current scanned row is being filtered out by the 1434 * filter. 1435 * @param s the scanner 1436 * @param curRowCell The cell in the current row which got filtered out 1437 * @return whether more rows are available for the scanner or not 1438 * @throws IOException 1439 */ 1440 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1441 throws IOException { 1442 // short circuit for performance 1443 boolean defaultResult = true; 1444 if (!hasCustomPostScannerFilterRow) { 1445 return defaultResult; 1446 } 1447 if (this.coprocEnvironments.isEmpty()) { 1448 return defaultResult; 1449 } 1450 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1451 regionObserverGetter, defaultResult) { 1452 @Override 1453 public Boolean call(RegionObserver observer) throws IOException { 1454 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1455 } 1456 }); 1457 } 1458 1459 /** 1460 * Supports Coprocessor 'bypass'. 1461 * @param s the scanner 1462 * @return true if default behavior should be bypassed, false otherwise 1463 * @exception IOException Exception 1464 */ 1465 // Should this be bypassable? 1466 public boolean preScannerClose(final InternalScanner s) throws IOException { 1467 return execOperation(coprocEnvironments.isEmpty()? null: 1468 new RegionObserverOperationWithoutResult(true) { 1469 @Override 1470 public void call(RegionObserver observer) throws IOException { 1471 observer.preScannerClose(this, s); 1472 } 1473 }); 1474 } 1475 1476 /** 1477 * @exception IOException Exception 1478 */ 1479 public void postScannerClose(final InternalScanner s) throws IOException { 1480 execOperation(coprocEnvironments.isEmpty()? null: 1481 new RegionObserverOperationWithoutResult() { 1482 @Override 1483 public void call(RegionObserver observer) throws IOException { 1484 observer.postScannerClose(this, s); 1485 } 1486 }); 1487 } 1488 1489 /** 1490 * Called before open store scanner for user scan. 1491 */ 1492 public ScanInfo preStoreScannerOpen(HStore store) throws IOException { 1493 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1494 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 1495 execOperation(new RegionObserverOperationWithoutResult() { 1496 @Override 1497 public void call(RegionObserver observer) throws IOException { 1498 observer.preStoreScannerOpen(this, store, builder); 1499 } 1500 }); 1501 return builder.build(); 1502 } 1503 1504 /** 1505 * @param info the RegionInfo for this region 1506 * @param edits the file of recovered edits 1507 */ 1508 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1509 execOperation(coprocEnvironments.isEmpty()? null: 1510 new RegionObserverOperationWithoutResult(true) { 1511 @Override 1512 public void call(RegionObserver observer) throws IOException { 1513 observer.preReplayWALs(this, info, edits); 1514 } 1515 }); 1516 } 1517 1518 /** 1519 * @param info the RegionInfo for this region 1520 * @param edits the file of recovered edits 1521 * @throws IOException Exception 1522 */ 1523 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1524 execOperation(coprocEnvironments.isEmpty()? null: 1525 new RegionObserverOperationWithoutResult() { 1526 @Override 1527 public void call(RegionObserver observer) throws IOException { 1528 observer.postReplayWALs(this, info, edits); 1529 } 1530 }); 1531 } 1532 1533 /** 1534 * Supports Coprocessor 'bypass'. 1535 * @return true if default behavior should be bypassed, false otherwise 1536 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1537 * with something that doesn't expose IntefaceAudience.Private classes. 1538 */ 1539 @Deprecated 1540 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1541 throws IOException { 1542 return execOperation(coprocEnvironments.isEmpty()? null: 1543 new RegionObserverOperationWithoutResult(true) { 1544 @Override 1545 public void call(RegionObserver observer) throws IOException { 1546 observer.preWALRestore(this, info, logKey, logEdit); 1547 } 1548 }); 1549 } 1550 1551 /** 1552 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1553 * with something that doesn't expose IntefaceAudience.Private classes. 1554 */ 1555 @Deprecated 1556 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1557 throws IOException { 1558 execOperation(coprocEnvironments.isEmpty()? null: 1559 new RegionObserverOperationWithoutResult() { 1560 @Override 1561 public void call(RegionObserver observer) throws IOException { 1562 observer.postWALRestore(this, info, logKey, logEdit); 1563 } 1564 }); 1565 } 1566 1567 /** 1568 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1569 */ 1570 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1571 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1572 @Override 1573 public void call(RegionObserver observer) throws IOException { 1574 observer.preBulkLoadHFile(this, familyPaths); 1575 } 1576 }); 1577 } 1578 1579 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1580 throws IOException { 1581 return execOperation(coprocEnvironments.isEmpty()? null: 1582 new RegionObserverOperationWithoutResult() { 1583 @Override 1584 public void call(RegionObserver observer) throws IOException { 1585 observer.preCommitStoreFile(this, family, pairs); 1586 } 1587 }); 1588 } 1589 1590 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { 1591 execOperation(coprocEnvironments.isEmpty()? null: 1592 new RegionObserverOperationWithoutResult() { 1593 @Override 1594 public void call(RegionObserver observer) throws IOException { 1595 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1596 } 1597 }); 1598 } 1599 1600 /** 1601 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1602 * @param map Map of CF to List of file paths for the final loaded files 1603 * @throws IOException 1604 */ 1605 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1606 Map<byte[], List<Path>> map) throws IOException { 1607 if (this.coprocEnvironments.isEmpty()) { 1608 return; 1609 } 1610 execOperation(coprocEnvironments.isEmpty()? null: 1611 new RegionObserverOperationWithoutResult() { 1612 @Override 1613 public void call(RegionObserver observer) throws IOException { 1614 observer.postBulkLoadHFile(this, familyPaths, map); 1615 } 1616 }); 1617 } 1618 1619 public void postStartRegionOperation(final Operation op) throws IOException { 1620 execOperation(coprocEnvironments.isEmpty()? null: 1621 new RegionObserverOperationWithoutResult() { 1622 @Override 1623 public void call(RegionObserver observer) throws IOException { 1624 observer.postStartRegionOperation(this, op); 1625 } 1626 }); 1627 } 1628 1629 public void postCloseRegionOperation(final Operation op) throws IOException { 1630 execOperation(coprocEnvironments.isEmpty()? null: 1631 new RegionObserverOperationWithoutResult() { 1632 @Override 1633 public void call(RegionObserver observer) throws IOException { 1634 observer.postCloseRegionOperation(this, op); 1635 } 1636 }); 1637 } 1638 1639 /** 1640 * @param fs fileystem to read from 1641 * @param p path to the file 1642 * @param in {@link FSDataInputStreamWrapper} 1643 * @param size Full size of the file 1644 * @param cacheConf 1645 * @param r original reference file. This will be not null only when reading a split file. 1646 * @return a Reader instance to use instead of the base reader if overriding 1647 * default behavior, null otherwise 1648 * @throws IOException 1649 */ 1650 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1651 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1652 final Reference r) throws IOException { 1653 if (coprocEnvironments.isEmpty()) { 1654 return null; 1655 } 1656 return execOperationWithResult( 1657 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1658 @Override 1659 public StoreFileReader call(RegionObserver observer) throws IOException { 1660 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1661 getResult()); 1662 } 1663 }); 1664 } 1665 1666 /** 1667 * @param fs fileystem to read from 1668 * @param p path to the file 1669 * @param in {@link FSDataInputStreamWrapper} 1670 * @param size Full size of the file 1671 * @param cacheConf 1672 * @param r original reference file. This will be not null only when reading a split file. 1673 * @param reader the base reader instance 1674 * @return The reader to use 1675 * @throws IOException 1676 */ 1677 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1678 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1679 final Reference r, final StoreFileReader reader) throws IOException { 1680 if (this.coprocEnvironments.isEmpty()) { 1681 return reader; 1682 } 1683 return execOperationWithResult( 1684 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) { 1685 @Override 1686 public StoreFileReader call(RegionObserver observer) throws IOException { 1687 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1688 getResult()); 1689 } 1690 }); 1691 } 1692 1693 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1694 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1695 if (this.coprocEnvironments.isEmpty()) { 1696 return cellPairs; 1697 } 1698 return execOperationWithResult( 1699 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1700 regionObserverGetter, cellPairs) { 1701 @Override 1702 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1703 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1704 } 1705 }); 1706 } 1707 1708 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1709 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1710 if (this.coprocEnvironments.isEmpty()) { 1711 return cellPairs; 1712 } 1713 return execOperationWithResult( 1714 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1715 regionObserverGetter, cellPairs) { 1716 @Override 1717 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1718 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1719 } 1720 }); 1721 } 1722 1723 public Message preEndpointInvocation(final Service service, final String methodName, 1724 Message request) throws IOException { 1725 if (coprocEnvironments.isEmpty()) { 1726 return request; 1727 } 1728 return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver, 1729 Message>(endpointObserverGetter, request) { 1730 @Override 1731 public Message call(EndpointObserver observer) throws IOException { 1732 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1733 } 1734 }); 1735 } 1736 1737 public void postEndpointInvocation(final Service service, final String methodName, 1738 final Message request, final Message.Builder responseBuilder) throws IOException { 1739 execOperation(coprocEnvironments.isEmpty() ? null : 1740 new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1741 @Override 1742 public void call(EndpointObserver observer) throws IOException { 1743 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1744 } 1745 }); 1746 } 1747 1748 /** 1749 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1750 */ 1751 @Deprecated 1752 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1753 if (this.coprocEnvironments.isEmpty()) { 1754 return result; 1755 } 1756 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>( 1757 regionObserverGetter, result) { 1758 @Override 1759 public DeleteTracker call(RegionObserver observer) throws IOException { 1760 return observer.postInstantiateDeleteTracker(this, getResult()); 1761 } 1762 }); 1763 } 1764 1765 ///////////////////////////////////////////////////////////////////////////////////////////////// 1766 // BulkLoadObserver hooks 1767 ///////////////////////////////////////////////////////////////////////////////////////////////// 1768 public void prePrepareBulkLoad(User user) throws IOException { 1769 execOperation(coprocEnvironments.isEmpty() ? null : 1770 new BulkLoadObserverOperation(user) { 1771 @Override protected void call(BulkLoadObserver observer) throws IOException { 1772 observer.prePrepareBulkLoad(this); 1773 } 1774 }); 1775 } 1776 1777 public void preCleanupBulkLoad(User user) throws IOException { 1778 execOperation(coprocEnvironments.isEmpty() ? null : 1779 new BulkLoadObserverOperation(user) { 1780 @Override protected void call(BulkLoadObserver observer) throws IOException { 1781 observer.preCleanupBulkLoad(this); 1782 } 1783 }); 1784 } 1785}