001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import com.google.protobuf.Message; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.RawCellBuilder; 040import org.apache.hadoop.hbase.RawCellBuilderFactory; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.SharedConnection; 043import org.apache.hadoop.hbase.client.Append; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Delete; 046import org.apache.hadoop.hbase.client.Durability; 047import org.apache.hadoop.hbase.client.Get; 048import org.apache.hadoop.hbase.client.Increment; 049import org.apache.hadoop.hbase.client.Mutation; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 056import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 057import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorService; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 061import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 063import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 064import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 065import org.apache.hadoop.hbase.coprocessor.ObserverContext; 066import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 068import org.apache.hadoop.hbase.coprocessor.RegionObserver; 069import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; 070import org.apache.hadoop.hbase.filter.ByteArrayComparable; 071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 072import org.apache.hadoop.hbase.io.Reference; 073import org.apache.hadoop.hbase.io.hfile.CacheConfig; 074import org.apache.hadoop.hbase.metrics.MetricRegistry; 075import org.apache.hadoop.hbase.regionserver.Region.Operation; 076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 079import org.apache.hadoop.hbase.security.User; 080import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WALEdit; 083import org.apache.hadoop.hbase.wal.WALKey; 084import org.apache.yetus.audience.InterfaceAudience; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 090 091/** 092 * Implements the coprocessor environment and runtime support for coprocessors 093 * loaded within a {@link Region}. 094 */ 095@InterfaceAudience.Private 096public class RegionCoprocessorHost 097 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 098 099 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 100 // The shared data map 101 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 102 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 103 AbstractReferenceMap.ReferenceStrength.WEAK); 104 105 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 106 private final boolean hasCustomPostScannerFilterRow; 107 108 /** 109 * 110 * Encapsulation of the environment of each coprocessor 111 */ 112 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 113 implements RegionCoprocessorEnvironment { 114 private Region region; 115 ConcurrentMap<String, Object> sharedData; 116 private final MetricRegistry metricRegistry; 117 private final RegionServerServices services; 118 119 /** 120 * Constructor 121 * @param impl the coprocessor instance 122 * @param priority chaining priority 123 */ 124 public RegionEnvironment(final RegionCoprocessor impl, final int priority, 125 final int seq, final Configuration conf, final Region region, 126 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 127 super(impl, priority, seq, conf); 128 this.region = region; 129 this.sharedData = sharedData; 130 this.services = services; 131 this.metricRegistry = 132 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 133 } 134 135 /** @return the region */ 136 @Override 137 public Region getRegion() { 138 return region; 139 } 140 141 @Override 142 public OnlineRegions getOnlineRegions() { 143 return this.services; 144 } 145 146 @Override 147 public Connection getConnection() { 148 // Mocks may have services as null at test time. 149 return services != null ? new SharedConnection(services.getConnection()) : null; 150 } 151 152 @Override 153 public Connection createConnection(Configuration conf) throws IOException { 154 return services != null ? this.services.createConnection(conf) : null; 155 } 156 157 @Override 158 public ServerName getServerName() { 159 return services != null? services.getServerName(): null; 160 } 161 162 @Override 163 public void shutdown() { 164 super.shutdown(); 165 MetricsCoprocessor.removeRegistry(this.metricRegistry); 166 } 167 168 @Override 169 public ConcurrentMap<String, Object> getSharedData() { 170 return sharedData; 171 } 172 173 @Override 174 public RegionInfo getRegionInfo() { 175 return region.getRegionInfo(); 176 } 177 178 @Override 179 public MetricRegistry getMetricRegistryForRegionServer() { 180 return metricRegistry; 181 } 182 183 @Override 184 public RawCellBuilder getCellBuilder() { 185 // We always do a DEEP_COPY only 186 return RawCellBuilderFactory.create(); 187 } 188 } 189 190 /** 191 * Special version of RegionEnvironment that exposes RegionServerServices for Core 192 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 193 */ 194 private static class RegionEnvironmentForCoreCoprocessors extends 195 RegionEnvironment implements HasRegionServerServices { 196 private final RegionServerServices rsServices; 197 198 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 199 final int seq, final Configuration conf, final Region region, 200 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 201 super(impl, priority, seq, conf, region, services, sharedData); 202 this.rsServices = services; 203 } 204 205 /** 206 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 207 * consumption. 208 */ 209 @Override 210 public RegionServerServices getRegionServerServices() { 211 return this.rsServices; 212 } 213 } 214 215 static class TableCoprocessorAttribute { 216 private Path path; 217 private String className; 218 private int priority; 219 private Configuration conf; 220 221 public TableCoprocessorAttribute(Path path, String className, int priority, 222 Configuration conf) { 223 this.path = path; 224 this.className = className; 225 this.priority = priority; 226 this.conf = conf; 227 } 228 229 public Path getPath() { 230 return path; 231 } 232 233 public String getClassName() { 234 return className; 235 } 236 237 public int getPriority() { 238 return priority; 239 } 240 241 public Configuration getConf() { 242 return conf; 243 } 244 } 245 246 /** The region server services */ 247 RegionServerServices rsServices; 248 /** The region */ 249 HRegion region; 250 251 /** 252 * Constructor 253 * @param region the region 254 * @param rsServices interface to available region server functionality 255 * @param conf the configuration 256 */ 257 public RegionCoprocessorHost(final HRegion region, 258 final RegionServerServices rsServices, final Configuration conf) { 259 super(rsServices); 260 this.conf = conf; 261 this.rsServices = rsServices; 262 this.region = region; 263 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 264 265 // load system default cp's from configuration. 266 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 267 268 // load system default cp's for user tables from configuration. 269 if (!region.getRegionInfo().getTable().isSystemTable()) { 270 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 271 } 272 273 // load Coprocessor From HDFS 274 loadTableCoprocessors(conf); 275 276 // now check whether any coprocessor implements postScannerFilterRow 277 boolean hasCustomPostScannerFilterRow = false; 278 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { 279 if (env.getInstance() instanceof RegionObserver) { 280 Class<?> clazz = env.getInstance().getClass(); 281 for(;;) { 282 if (clazz == null) { 283 // we must have directly implemented RegionObserver 284 hasCustomPostScannerFilterRow = true; 285 break out; 286 } 287 try { 288 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 289 InternalScanner.class, Cell.class, boolean.class); 290 // this coprocessor has a custom version of postScannerFilterRow 291 hasCustomPostScannerFilterRow = true; 292 break out; 293 } catch (NoSuchMethodException ignore) { 294 } 295 // the deprecated signature still exists 296 try { 297 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 298 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 299 // this coprocessor has a custom version of postScannerFilterRow 300 hasCustomPostScannerFilterRow = true; 301 break out; 302 } catch (NoSuchMethodException ignore) { 303 } 304 clazz = clazz.getSuperclass(); 305 } 306 } 307 } 308 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 309 } 310 311 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 312 TableDescriptor htd) { 313 return htd.getCoprocessorDescriptors().stream().map(cp -> { 314 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 315 Configuration ourConf; 316 if (!cp.getProperties().isEmpty()) { 317 // do an explicit deep copy of the passed configuration 318 ourConf = new Configuration(false); 319 HBaseConfiguration.merge(ourConf, conf); 320 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 321 } else { 322 ourConf = conf; 323 } 324 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 325 }).collect(Collectors.toList()); 326 } 327 328 /** 329 * Sanity check the table coprocessor attributes of the supplied schema. Will 330 * throw an exception if there is a problem. 331 * @param conf 332 * @param htd 333 * @throws IOException 334 */ 335 public static void testTableCoprocessorAttrs(final Configuration conf, 336 final TableDescriptor htd) throws IOException { 337 String pathPrefix = UUID.randomUUID().toString(); 338 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { 339 if (attr.getPriority() < 0) { 340 throw new IOException("Priority for coprocessor " + attr.getClassName() + 341 " cannot be less than 0"); 342 } 343 ClassLoader old = Thread.currentThread().getContextClassLoader(); 344 try { 345 ClassLoader cl; 346 if (attr.getPath() != null) { 347 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 348 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 349 } else { 350 cl = CoprocessorHost.class.getClassLoader(); 351 } 352 Thread.currentThread().setContextClassLoader(cl); 353 cl.loadClass(attr.getClassName()); 354 } catch (ClassNotFoundException e) { 355 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 356 } finally { 357 Thread.currentThread().setContextClassLoader(old); 358 } 359 } 360 } 361 362 void loadTableCoprocessors(final Configuration conf) { 363 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 364 DEFAULT_COPROCESSORS_ENABLED); 365 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 366 DEFAULT_USER_COPROCESSORS_ENABLED); 367 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 368 return; 369 } 370 371 // scan the table attributes for coprocessor load specifications 372 // initialize the coprocessors 373 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 374 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 375 region.getTableDescriptor())) { 376 // Load encompasses classloading and coprocessor initialization 377 try { 378 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), 379 attr.getPriority(), attr.getConf()); 380 if (env == null) { 381 continue; 382 } 383 configured.add(env); 384 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + 385 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 386 } catch (Throwable t) { 387 // Coprocessor failed to load, do we abort on error? 388 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 389 abortServer(attr.getClassName(), t); 390 } else { 391 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 392 } 393 } 394 } 395 // add together to coprocessor set for COW efficiency 396 coprocEnvironments.addAll(configured); 397 } 398 399 @Override 400 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 401 Configuration conf) { 402 // If coprocessor exposes any services, register them. 403 for (Service service : instance.getServices()) { 404 region.registerService(service); 405 } 406 ConcurrentMap<String, Object> classData; 407 // make sure only one thread can add maps 408 synchronized (SHARED_DATA_MAP) { 409 // as long as at least one RegionEnvironment holds on to its classData it will 410 // remain in this map 411 classData = 412 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 413 k -> new ConcurrentHashMap<>()); 414 } 415 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 416 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 417 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, 418 rsServices, classData): 419 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 420 } 421 422 @Override 423 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 424 throws InstantiationException, IllegalAccessException { 425 try { 426 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 427 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 428 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { 429 // For backward compatibility with old CoprocessorService impl which don't extend 430 // RegionCoprocessor. 431 CoprocessorService cs; 432 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); 433 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); 434 } else { 435 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 436 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 437 return null; 438 } 439 } catch (NoSuchMethodException | InvocationTargetException e) { 440 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 441 } 442 } 443 444 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 445 RegionCoprocessor::getRegionObserver; 446 447 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 448 RegionCoprocessor::getEndpointObserver; 449 450 abstract class RegionObserverOperationWithoutResult extends 451 ObserverOperationWithoutResult<RegionObserver> { 452 public RegionObserverOperationWithoutResult() { 453 super(regionObserverGetter); 454 } 455 456 public RegionObserverOperationWithoutResult(User user) { 457 super(regionObserverGetter, user); 458 } 459 460 public RegionObserverOperationWithoutResult(boolean bypassable) { 461 super(regionObserverGetter, null, bypassable); 462 } 463 464 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 465 super(regionObserverGetter, user, bypassable); 466 } 467 } 468 469 abstract class BulkLoadObserverOperation extends 470 ObserverOperationWithoutResult<BulkLoadObserver> { 471 public BulkLoadObserverOperation(User user) { 472 super(RegionCoprocessor::getBulkLoadObserver, user); 473 } 474 } 475 476 477 ////////////////////////////////////////////////////////////////////////////////////////////////// 478 // Observer operations 479 ////////////////////////////////////////////////////////////////////////////////////////////////// 480 481 ////////////////////////////////////////////////////////////////////////////////////////////////// 482 // Observer operations 483 ////////////////////////////////////////////////////////////////////////////////////////////////// 484 485 /** 486 * Invoked before a region open. 487 * 488 * @throws IOException Signals that an I/O exception has occurred. 489 */ 490 public void preOpen() throws IOException { 491 if (coprocEnvironments.isEmpty()) { 492 return; 493 } 494 execOperation(new RegionObserverOperationWithoutResult() { 495 @Override 496 public void call(RegionObserver observer) throws IOException { 497 observer.preOpen(this); 498 } 499 }); 500 } 501 502 503 /** 504 * Invoked after a region open 505 */ 506 public void postOpen() { 507 if (coprocEnvironments.isEmpty()) { 508 return; 509 } 510 try { 511 execOperation(new RegionObserverOperationWithoutResult() { 512 @Override 513 public void call(RegionObserver observer) throws IOException { 514 observer.postOpen(this); 515 } 516 }); 517 } catch (IOException e) { 518 LOG.warn(e.toString(), e); 519 } 520 } 521 522 /** 523 * Invoked before a region is closed 524 * @param abortRequested true if the server is aborting 525 */ 526 public void preClose(final boolean abortRequested) throws IOException { 527 execOperation(new RegionObserverOperationWithoutResult() { 528 @Override 529 public void call(RegionObserver observer) throws IOException { 530 observer.preClose(this, abortRequested); 531 } 532 }); 533 } 534 535 /** 536 * Invoked after a region is closed 537 * @param abortRequested true if the server is aborting 538 */ 539 public void postClose(final boolean abortRequested) { 540 try { 541 execOperation(new RegionObserverOperationWithoutResult() { 542 @Override 543 public void call(RegionObserver observer) throws IOException { 544 observer.postClose(this, abortRequested); 545 } 546 547 @Override 548 public void postEnvCall() { 549 shutdown(this.getEnvironment()); 550 } 551 }); 552 } catch (IOException e) { 553 LOG.warn(e.toString(), e); 554 } 555 } 556 557 /** 558 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 559 * available candidates. 560 * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed 561 * the passed in <code>candidates</code>. 562 * @param store The store where compaction is being requested 563 * @param candidates The currently available store files 564 * @param tracker used to track the life cycle of a compaction 565 * @param user the user 566 * @throws IOException 567 */ 568 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 569 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 570 if (coprocEnvironments.isEmpty()) { 571 return false; 572 } 573 boolean bypassable = true; 574 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 575 @Override 576 public void call(RegionObserver observer) throws IOException { 577 observer.preCompactSelection(this, store, candidates, tracker); 578 } 579 }); 580 } 581 582 /** 583 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 584 * candidates. 585 * @param store The store where compaction is being requested 586 * @param selected The store files selected to compact 587 * @param tracker used to track the life cycle of a compaction 588 * @param request the compaction request 589 * @param user the user 590 */ 591 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 592 final CompactionLifeCycleTracker tracker, final CompactionRequest request, 593 final User user) throws IOException { 594 if (coprocEnvironments.isEmpty()) { 595 return; 596 } 597 execOperation(new RegionObserverOperationWithoutResult(user) { 598 @Override 599 public void call(RegionObserver observer) throws IOException { 600 observer.postCompactSelection(this, store, selected, tracker, request); 601 } 602 }); 603 } 604 605 /** 606 * Called prior to opening store scanner for compaction. 607 */ 608 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 609 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 610 if (coprocEnvironments.isEmpty()) { 611 return store.getScanInfo(); 612 } 613 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 614 execOperation(new RegionObserverOperationWithoutResult(user) { 615 @Override 616 public void call(RegionObserver observer) throws IOException { 617 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 618 } 619 }); 620 return builder.build(); 621 } 622 623 /** 624 * Called prior to rewriting the store files selected for compaction 625 * @param store the store being compacted 626 * @param scanner the scanner used to read store data during compaction 627 * @param scanType type of Scan 628 * @param tracker used to track the life cycle of a compaction 629 * @param request the compaction request 630 * @param user the user 631 * @return Scanner to use (cannot be null!) 632 * @throws IOException 633 */ 634 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 635 final ScanType scanType, final CompactionLifeCycleTracker tracker, 636 final CompactionRequest request, final User user) throws IOException { 637 InternalScanner defaultResult = scanner; 638 if (coprocEnvironments.isEmpty()) { 639 return defaultResult; 640 } 641 return execOperationWithResult( 642 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, 643 defaultResult, user) { 644 @Override 645 public InternalScanner call(RegionObserver observer) throws IOException { 646 InternalScanner scanner = 647 observer.preCompact(this, store, getResult(), scanType, tracker, request); 648 if (scanner == null) { 649 throw new CoprocessorException("Null Scanner return disallowed!"); 650 } 651 return scanner; 652 } 653 }); 654 } 655 656 /** 657 * Called after the store compaction has completed. 658 * @param store the store being compacted 659 * @param resultFile the new store file written during compaction 660 * @param tracker used to track the life cycle of a compaction 661 * @param request the compaction request 662 * @param user the user 663 * @throws IOException 664 */ 665 public void postCompact(final HStore store, final HStoreFile resultFile, 666 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 667 throws IOException { 668 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { 669 @Override 670 public void call(RegionObserver observer) throws IOException { 671 observer.postCompact(this, store, resultFile, tracker, request); 672 } 673 }); 674 } 675 676 /** 677 * Invoked before create StoreScanner for flush. 678 */ 679 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 680 throws IOException { 681 if (coprocEnvironments.isEmpty()) { 682 return store.getScanInfo(); 683 } 684 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 685 execOperation(new RegionObserverOperationWithoutResult() { 686 @Override 687 public void call(RegionObserver observer) throws IOException { 688 observer.preFlushScannerOpen(this, store, builder, tracker); 689 } 690 }); 691 return builder.build(); 692 } 693 694 /** 695 * Invoked before a memstore flush 696 * @return Scanner to use (cannot be null!) 697 * @throws IOException 698 */ 699 public InternalScanner preFlush(HStore store, InternalScanner scanner, 700 FlushLifeCycleTracker tracker) throws IOException { 701 if (coprocEnvironments.isEmpty()) { 702 return scanner; 703 } 704 return execOperationWithResult( 705 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) { 706 @Override 707 public InternalScanner call(RegionObserver observer) throws IOException { 708 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 709 if (scanner == null) { 710 throw new CoprocessorException("Null Scanner return disallowed!"); 711 } 712 return scanner; 713 } 714 }); 715 } 716 717 /** 718 * Invoked before a memstore flush 719 * @throws IOException 720 */ 721 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 722 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 723 @Override 724 public void call(RegionObserver observer) throws IOException { 725 observer.preFlush(this, tracker); 726 } 727 }); 728 } 729 730 /** 731 * Invoked after a memstore flush 732 * @throws IOException 733 */ 734 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 735 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 736 @Override 737 public void call(RegionObserver observer) throws IOException { 738 observer.postFlush(this, tracker); 739 } 740 }); 741 } 742 743 /** 744 * Invoked before in memory compaction. 745 */ 746 public void preMemStoreCompaction(HStore store) throws IOException { 747 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 748 @Override 749 public void call(RegionObserver observer) throws IOException { 750 observer.preMemStoreCompaction(this, store); 751 } 752 }); 753 } 754 755 /** 756 * Invoked before create StoreScanner for in memory compaction. 757 */ 758 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 759 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 760 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 761 @Override 762 public void call(RegionObserver observer) throws IOException { 763 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 764 } 765 }); 766 return builder.build(); 767 } 768 769 /** 770 * Invoked before compacting memstore. 771 */ 772 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 773 throws IOException { 774 if (coprocEnvironments.isEmpty()) { 775 return scanner; 776 } 777 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 778 regionObserverGetter, scanner) { 779 @Override 780 public InternalScanner call(RegionObserver observer) throws IOException { 781 return observer.preMemStoreCompactionCompact(this, store, getResult()); 782 } 783 }); 784 } 785 786 /** 787 * Invoked after in memory compaction. 788 */ 789 public void postMemStoreCompaction(HStore store) throws IOException { 790 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 791 @Override 792 public void call(RegionObserver observer) throws IOException { 793 observer.postMemStoreCompaction(this, store); 794 } 795 }); 796 } 797 798 /** 799 * Invoked after a memstore flush 800 * @throws IOException 801 */ 802 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 803 throws IOException { 804 if (coprocEnvironments.isEmpty()) { 805 return; 806 } 807 execOperation(new RegionObserverOperationWithoutResult() { 808 @Override 809 public void call(RegionObserver observer) throws IOException { 810 observer.postFlush(this, store, storeFile, tracker); 811 } 812 }); 813 } 814 815 // RegionObserver support 816 /** 817 * Supports Coprocessor 'bypass'. 818 * @param get the Get request 819 * @param results What to return if return is true/'bypass'. 820 * @return true if default processing should be bypassed. 821 * @exception IOException Exception 822 */ 823 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 824 if (coprocEnvironments.isEmpty()) { 825 return false; 826 } 827 boolean bypassable = true; 828 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 829 @Override 830 public void call(RegionObserver observer) throws IOException { 831 observer.preGetOp(this, get, results); 832 } 833 }); 834 } 835 836 /** 837 * @param get the Get request 838 * @param results the result set 839 * @exception IOException Exception 840 */ 841 public void postGet(final Get get, final List<Cell> results) 842 throws IOException { 843 if (coprocEnvironments.isEmpty()) { 844 return; 845 } 846 execOperation(new RegionObserverOperationWithoutResult() { 847 @Override 848 public void call(RegionObserver observer) throws IOException { 849 observer.postGetOp(this, get, results); 850 } 851 }); 852 } 853 854 /** 855 * Supports Coprocessor 'bypass'. 856 * @param get the Get request 857 * @return true or false to return to client if bypassing normal operation, or null otherwise 858 * @exception IOException Exception 859 */ 860 public Boolean preExists(final Get get) throws IOException { 861 boolean bypassable = true; 862 boolean defaultResult = false; 863 if (coprocEnvironments.isEmpty()) { 864 return null; 865 } 866 return execOperationWithResult( 867 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 868 defaultResult, bypassable) { 869 @Override 870 public Boolean call(RegionObserver observer) throws IOException { 871 return observer.preExists(this, get, getResult()); 872 } 873 }); 874 } 875 876 /** 877 * @param get the Get request 878 * @param result the result returned by the region server 879 * @return the result to return to the client 880 * @exception IOException Exception 881 */ 882 public boolean postExists(final Get get, boolean result) 883 throws IOException { 884 if (this.coprocEnvironments.isEmpty()) { 885 return result; 886 } 887 return execOperationWithResult( 888 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 889 @Override 890 public Boolean call(RegionObserver observer) throws IOException { 891 return observer.postExists(this, get, getResult()); 892 } 893 }); 894 } 895 896 /** 897 * Supports Coprocessor 'bypass'. 898 * @param put The Put object 899 * @param edit The WALEdit object. 900 * @param durability The durability used 901 * @return true if default processing should be bypassed 902 * @exception IOException Exception 903 */ 904 public boolean prePut(final Put put, final WALEdit edit, final Durability durability) 905 throws IOException { 906 if (coprocEnvironments.isEmpty()) { 907 return false; 908 } 909 boolean bypassable = true; 910 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 911 @Override 912 public void call(RegionObserver observer) throws IOException { 913 observer.prePut(this, put, edit, durability); 914 } 915 }); 916 } 917 918 /** 919 * Supports Coprocessor 'bypass'. 920 * @param mutation - the current mutation 921 * @param kv - the current cell 922 * @param byteNow - current timestamp in bytes 923 * @param get - the get that could be used 924 * Note that the get only does not specify the family and qualifier that should be used 925 * @return true if default processing should be bypassed 926 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 927 * Coprocessor for its needs only. Will be removed. 928 */ 929 @Deprecated 930 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, 931 final Cell kv, final byte[] byteNow, final Get get) throws IOException { 932 if (coprocEnvironments.isEmpty()) { 933 return false; 934 } 935 boolean bypassable = true; 936 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 937 @Override 938 public void call(RegionObserver observer) throws IOException { 939 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 940 } 941 }); 942 } 943 944 /** 945 * @param put The Put object 946 * @param edit The WALEdit object. 947 * @param durability The durability used 948 * @exception IOException Exception 949 */ 950 public void postPut(final Put put, final WALEdit edit, final Durability durability) 951 throws IOException { 952 if (coprocEnvironments.isEmpty()) { 953 return; 954 } 955 execOperation(new RegionObserverOperationWithoutResult() { 956 @Override 957 public void call(RegionObserver observer) throws IOException { 958 observer.postPut(this, put, edit, durability); 959 } 960 }); 961 } 962 963 /** 964 * Supports Coprocessor 'bypass'. 965 * @param delete The Delete object 966 * @param edit The WALEdit object. 967 * @param durability The durability used 968 * @return true if default processing should be bypassed 969 * @exception IOException Exception 970 */ 971 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) 972 throws IOException { 973 if (this.coprocEnvironments.isEmpty()) { 974 return false; 975 } 976 boolean bypassable = true; 977 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 978 @Override 979 public void call(RegionObserver observer) throws IOException { 980 observer.preDelete(this, delete, edit, durability); 981 } 982 }); 983 } 984 985 /** 986 * @param delete The Delete object 987 * @param edit The WALEdit object. 988 * @param durability The durability used 989 * @exception IOException Exception 990 */ 991 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) 992 throws IOException { 993 execOperation(coprocEnvironments.isEmpty()? null: 994 new RegionObserverOperationWithoutResult() { 995 @Override 996 public void call(RegionObserver observer) throws IOException { 997 observer.postDelete(this, delete, edit, durability); 998 } 999 }); 1000 } 1001 1002 public void preBatchMutate( 1003 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1004 if(this.coprocEnvironments.isEmpty()) { 1005 return; 1006 } 1007 execOperation(new RegionObserverOperationWithoutResult() { 1008 @Override 1009 public void call(RegionObserver observer) throws IOException { 1010 observer.preBatchMutate(this, miniBatchOp); 1011 } 1012 }); 1013 } 1014 1015 public void postBatchMutate( 1016 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1017 if (this.coprocEnvironments.isEmpty()) { 1018 return; 1019 } 1020 execOperation(new RegionObserverOperationWithoutResult() { 1021 @Override 1022 public void call(RegionObserver observer) throws IOException { 1023 observer.postBatchMutate(this, miniBatchOp); 1024 } 1025 }); 1026 } 1027 1028 public void postBatchMutateIndispensably( 1029 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) 1030 throws IOException { 1031 if (this.coprocEnvironments.isEmpty()) { 1032 return; 1033 } 1034 execOperation(new RegionObserverOperationWithoutResult() { 1035 @Override 1036 public void call(RegionObserver observer) throws IOException { 1037 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1038 } 1039 }); 1040 } 1041 1042 /** 1043 * Supports Coprocessor 'bypass'. 1044 * @param row row to check 1045 * @param family column family 1046 * @param qualifier column qualifier 1047 * @param op the comparison operation 1048 * @param comparator the comparator 1049 * @param put data to put if check succeeds 1050 * @return true or false to return to client if default processing should be bypassed, or null 1051 * otherwise 1052 */ 1053 public Boolean preCheckAndPut(final byte [] row, final byte [] family, 1054 final byte [] qualifier, final CompareOperator op, 1055 final ByteArrayComparable comparator, final Put put) 1056 throws IOException { 1057 boolean bypassable = true; 1058 boolean defaultResult = false; 1059 if (coprocEnvironments.isEmpty()) { 1060 return null; 1061 } 1062 return execOperationWithResult( 1063 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1064 defaultResult, bypassable) { 1065 @Override 1066 public Boolean call(RegionObserver observer) throws IOException { 1067 return observer.preCheckAndPut(this, row, family, qualifier, 1068 op, comparator, put, getResult()); 1069 } 1070 }); 1071 } 1072 1073 /** 1074 * Supports Coprocessor 'bypass'. 1075 * @param row row to check 1076 * @param family column family 1077 * @param qualifier column qualifier 1078 * @param op the comparison operation 1079 * @param comparator the comparator 1080 * @param put data to put if check succeeds 1081 * @return true or false to return to client if default processing should be bypassed, or null 1082 * otherwise 1083 */ 1084 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1085 justification="Null is legit") 1086 public Boolean preCheckAndPutAfterRowLock( 1087 final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, 1088 final ByteArrayComparable comparator, final Put put) throws IOException { 1089 boolean bypassable = true; 1090 boolean defaultResult = false; 1091 if (coprocEnvironments.isEmpty()) { 1092 return null; 1093 } 1094 return execOperationWithResult( 1095 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1096 defaultResult, bypassable) { 1097 @Override 1098 public Boolean call(RegionObserver observer) throws IOException { 1099 return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, 1100 op, comparator, put, getResult()); 1101 } 1102 }); 1103 } 1104 1105 /** 1106 * @param row row to check 1107 * @param family column family 1108 * @param qualifier column qualifier 1109 * @param op the comparison operation 1110 * @param comparator the comparator 1111 * @param put data to put if check succeeds 1112 * @throws IOException e 1113 */ 1114 public boolean postCheckAndPut(final byte [] row, final byte [] family, 1115 final byte [] qualifier, final CompareOperator op, 1116 final ByteArrayComparable comparator, final Put put, 1117 boolean result) throws IOException { 1118 if (this.coprocEnvironments.isEmpty()) { 1119 return result; 1120 } 1121 return execOperationWithResult( 1122 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1123 @Override 1124 public Boolean call(RegionObserver observer) throws IOException { 1125 return observer.postCheckAndPut(this, row, family, qualifier, 1126 op, comparator, put, getResult()); 1127 } 1128 }); 1129 } 1130 1131 /** 1132 * Supports Coprocessor 'bypass'. 1133 * @param row row to check 1134 * @param family column family 1135 * @param qualifier column qualifier 1136 * @param op the comparison operation 1137 * @param comparator the comparator 1138 * @param delete delete to commit if check succeeds 1139 * @return true or false to return to client if default processing should be bypassed, 1140 * or null otherwise 1141 */ 1142 public Boolean preCheckAndDelete(final byte [] row, final byte [] family, 1143 final byte [] qualifier, final CompareOperator op, 1144 final ByteArrayComparable comparator, final Delete delete) 1145 throws IOException { 1146 boolean bypassable = true; 1147 boolean defaultResult = false; 1148 if (coprocEnvironments.isEmpty()) { 1149 return null; 1150 } 1151 return execOperationWithResult( 1152 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1153 defaultResult, bypassable) { 1154 @Override 1155 public Boolean call(RegionObserver observer) throws IOException { 1156 return observer.preCheckAndDelete(this, row, family, 1157 qualifier, op, comparator, delete, getResult()); 1158 } 1159 }); 1160 } 1161 1162 /** 1163 * Supports Coprocessor 'bypass'. 1164 * @param row row to check 1165 * @param family column family 1166 * @param qualifier column qualifier 1167 * @param op the comparison operation 1168 * @param comparator the comparator 1169 * @param delete delete to commit if check succeeds 1170 * @return true or false to return to client if default processing should be bypassed, 1171 * or null otherwise 1172 */ 1173 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1174 justification="Null is legit") 1175 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, 1176 final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, 1177 final Delete delete) throws IOException { 1178 boolean bypassable = true; 1179 boolean defaultResult = false; 1180 if (coprocEnvironments.isEmpty()) { 1181 return null; 1182 } 1183 return execOperationWithResult( 1184 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1185 defaultResult, bypassable) { 1186 @Override 1187 public Boolean call(RegionObserver observer) throws IOException { 1188 return observer.preCheckAndDeleteAfterRowLock(this, row, 1189 family, qualifier, op, comparator, delete, getResult()); 1190 } 1191 }); 1192 } 1193 1194 /** 1195 * @param row row to check 1196 * @param family column family 1197 * @param qualifier column qualifier 1198 * @param op the comparison operation 1199 * @param comparator the comparator 1200 * @param delete delete to commit if check succeeds 1201 * @throws IOException e 1202 */ 1203 public boolean postCheckAndDelete(final byte [] row, final byte [] family, 1204 final byte [] qualifier, final CompareOperator op, 1205 final ByteArrayComparable comparator, final Delete delete, 1206 boolean result) throws IOException { 1207 if (this.coprocEnvironments.isEmpty()) { 1208 return result; 1209 } 1210 return execOperationWithResult( 1211 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1212 @Override 1213 public Boolean call(RegionObserver observer) throws IOException { 1214 return observer.postCheckAndDelete(this, row, family, 1215 qualifier, op, comparator, delete, getResult()); 1216 } 1217 }); 1218 } 1219 1220 /** 1221 * Supports Coprocessor 'bypass'. 1222 * @param append append object 1223 * @return result to return to client if default operation should be bypassed, null otherwise 1224 * @throws IOException if an error occurred on the coprocessor 1225 */ 1226 public Result preAppend(final Append append) throws IOException { 1227 boolean bypassable = true; 1228 Result defaultResult = null; 1229 if (this.coprocEnvironments.isEmpty()) { 1230 return defaultResult; 1231 } 1232 return execOperationWithResult( 1233 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1234 bypassable) { 1235 @Override 1236 public Result call(RegionObserver observer) throws IOException { 1237 return observer.preAppend(this, append); 1238 } 1239 }); 1240 } 1241 1242 /** 1243 * Supports Coprocessor 'bypass'. 1244 * @param append append object 1245 * @return result to return to client if default operation should be bypassed, null otherwise 1246 * @throws IOException if an error occurred on the coprocessor 1247 */ 1248 public Result preAppendAfterRowLock(final Append append) throws IOException { 1249 boolean bypassable = true; 1250 Result defaultResult = null; 1251 if (this.coprocEnvironments.isEmpty()) { 1252 return defaultResult; 1253 } 1254 return execOperationWithResult( 1255 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, 1256 defaultResult, bypassable) { 1257 @Override 1258 public Result call(RegionObserver observer) throws IOException { 1259 return observer.preAppendAfterRowLock(this, append); 1260 } 1261 }); 1262 } 1263 1264 /** 1265 * Supports Coprocessor 'bypass'. 1266 * @param increment increment object 1267 * @return result to return to client if default operation should be bypassed, null otherwise 1268 * @throws IOException if an error occurred on the coprocessor 1269 */ 1270 public Result preIncrement(final Increment increment) throws IOException { 1271 boolean bypassable = true; 1272 Result defaultResult = null; 1273 if (coprocEnvironments.isEmpty()) { 1274 return defaultResult; 1275 } 1276 return execOperationWithResult( 1277 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1278 bypassable) { 1279 @Override 1280 public Result call(RegionObserver observer) throws IOException { 1281 return observer.preIncrement(this, increment); 1282 } 1283 }); 1284 } 1285 1286 /** 1287 * Supports Coprocessor 'bypass'. 1288 * @param increment increment object 1289 * @return result to return to client if default operation should be bypassed, null otherwise 1290 * @throws IOException if an error occurred on the coprocessor 1291 */ 1292 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1293 boolean bypassable = true; 1294 Result defaultResult = null; 1295 if (coprocEnvironments.isEmpty()) { 1296 return defaultResult; 1297 } 1298 return execOperationWithResult( 1299 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1300 bypassable) { 1301 @Override 1302 public Result call(RegionObserver observer) throws IOException { 1303 return observer.preIncrementAfterRowLock(this, increment); 1304 } 1305 }); 1306 } 1307 1308 /** 1309 * @param append Append object 1310 * @param result the result returned by the append 1311 * @throws IOException if an error occurred on the coprocessor 1312 */ 1313 public Result postAppend(final Append append, final Result result) throws IOException { 1314 if (this.coprocEnvironments.isEmpty()) { 1315 return result; 1316 } 1317 return execOperationWithResult( 1318 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1319 @Override 1320 public Result call(RegionObserver observer) throws IOException { 1321 return observer.postAppend(this, append, result); 1322 } 1323 }); 1324 } 1325 1326 /** 1327 * @param increment increment object 1328 * @param result the result returned by postIncrement 1329 * @throws IOException if an error occurred on the coprocessor 1330 */ 1331 public Result postIncrement(final Increment increment, Result result) throws IOException { 1332 if (this.coprocEnvironments.isEmpty()) { 1333 return result; 1334 } 1335 return execOperationWithResult( 1336 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1337 @Override 1338 public Result call(RegionObserver observer) throws IOException { 1339 return observer.postIncrement(this, increment, getResult()); 1340 } 1341 }); 1342 } 1343 1344 /** 1345 * @param scan the Scan specification 1346 * @exception IOException Exception 1347 */ 1348 public void preScannerOpen(final Scan scan) throws IOException { 1349 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1350 @Override 1351 public void call(RegionObserver observer) throws IOException { 1352 observer.preScannerOpen(this, scan); 1353 } 1354 }); 1355 } 1356 1357 /** 1358 * @param scan the Scan specification 1359 * @param s the scanner 1360 * @return the scanner instance to use 1361 * @exception IOException Exception 1362 */ 1363 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1364 if (this.coprocEnvironments.isEmpty()) { 1365 return s; 1366 } 1367 return execOperationWithResult( 1368 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1369 @Override 1370 public RegionScanner call(RegionObserver observer) throws IOException { 1371 return observer.postScannerOpen(this, scan, getResult()); 1372 } 1373 }); 1374 } 1375 1376 /** 1377 * @param s the scanner 1378 * @param results the result set returned by the region server 1379 * @param limit the maximum number of results to return 1380 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1381 * @exception IOException Exception 1382 */ 1383 public Boolean preScannerNext(final InternalScanner s, 1384 final List<Result> results, final int limit) throws IOException { 1385 boolean bypassable = true; 1386 boolean defaultResult = false; 1387 if (coprocEnvironments.isEmpty()) { 1388 return null; 1389 } 1390 return execOperationWithResult( 1391 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1392 defaultResult, bypassable) { 1393 @Override 1394 public Boolean call(RegionObserver observer) throws IOException { 1395 return observer.preScannerNext(this, s, results, limit, getResult()); 1396 } 1397 }); 1398 } 1399 1400 /** 1401 * @param s the scanner 1402 * @param results the result set returned by the region server 1403 * @param limit the maximum number of results to return 1404 * @param hasMore 1405 * @return 'has more' indication to give to client 1406 * @exception IOException Exception 1407 */ 1408 public boolean postScannerNext(final InternalScanner s, 1409 final List<Result> results, final int limit, boolean hasMore) 1410 throws IOException { 1411 if (this.coprocEnvironments.isEmpty()) { 1412 return hasMore; 1413 } 1414 return execOperationWithResult( 1415 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1416 @Override 1417 public Boolean call(RegionObserver observer) throws IOException { 1418 return observer.postScannerNext(this, s, results, limit, getResult()); 1419 } 1420 }); 1421 } 1422 1423 /** 1424 * This will be called by the scan flow when the current scanned row is being filtered out by the 1425 * filter. 1426 * @param s the scanner 1427 * @param curRowCell The cell in the current row which got filtered out 1428 * @return whether more rows are available for the scanner or not 1429 * @throws IOException 1430 */ 1431 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1432 throws IOException { 1433 // short circuit for performance 1434 boolean defaultResult = true; 1435 if (!hasCustomPostScannerFilterRow) { 1436 return defaultResult; 1437 } 1438 if (this.coprocEnvironments.isEmpty()) { 1439 return defaultResult; 1440 } 1441 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1442 regionObserverGetter, defaultResult) { 1443 @Override 1444 public Boolean call(RegionObserver observer) throws IOException { 1445 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1446 } 1447 }); 1448 } 1449 1450 /** 1451 * Supports Coprocessor 'bypass'. 1452 * @param s the scanner 1453 * @return true if default behavior should be bypassed, false otherwise 1454 * @exception IOException Exception 1455 */ 1456 // Should this be bypassable? 1457 public boolean preScannerClose(final InternalScanner s) throws IOException { 1458 return execOperation(coprocEnvironments.isEmpty()? null: 1459 new RegionObserverOperationWithoutResult(true) { 1460 @Override 1461 public void call(RegionObserver observer) throws IOException { 1462 observer.preScannerClose(this, s); 1463 } 1464 }); 1465 } 1466 1467 /** 1468 * @exception IOException Exception 1469 */ 1470 public void postScannerClose(final InternalScanner s) throws IOException { 1471 execOperation(coprocEnvironments.isEmpty()? null: 1472 new RegionObserverOperationWithoutResult() { 1473 @Override 1474 public void call(RegionObserver observer) throws IOException { 1475 observer.postScannerClose(this, s); 1476 } 1477 }); 1478 } 1479 1480 /** 1481 * Called before open store scanner for user scan. 1482 */ 1483 public ScanInfo preStoreScannerOpen(HStore store) throws IOException { 1484 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1485 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 1486 execOperation(new RegionObserverOperationWithoutResult() { 1487 @Override 1488 public void call(RegionObserver observer) throws IOException { 1489 observer.preStoreScannerOpen(this, store, builder); 1490 } 1491 }); 1492 return builder.build(); 1493 } 1494 1495 /** 1496 * @param info the RegionInfo for this region 1497 * @param edits the file of recovered edits 1498 */ 1499 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1500 execOperation(coprocEnvironments.isEmpty()? null: 1501 new RegionObserverOperationWithoutResult(true) { 1502 @Override 1503 public void call(RegionObserver observer) throws IOException { 1504 observer.preReplayWALs(this, info, edits); 1505 } 1506 }); 1507 } 1508 1509 /** 1510 * @param info the RegionInfo for this region 1511 * @param edits the file of recovered edits 1512 * @throws IOException Exception 1513 */ 1514 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1515 execOperation(coprocEnvironments.isEmpty()? null: 1516 new RegionObserverOperationWithoutResult() { 1517 @Override 1518 public void call(RegionObserver observer) throws IOException { 1519 observer.postReplayWALs(this, info, edits); 1520 } 1521 }); 1522 } 1523 1524 /** 1525 * Supports Coprocessor 'bypass'. 1526 * @return true if default behavior should be bypassed, false otherwise 1527 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1528 * with something that doesn't expose IntefaceAudience.Private classes. 1529 */ 1530 @Deprecated 1531 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1532 throws IOException { 1533 return execOperation(coprocEnvironments.isEmpty()? null: 1534 new RegionObserverOperationWithoutResult(true) { 1535 @Override 1536 public void call(RegionObserver observer) throws IOException { 1537 observer.preWALRestore(this, info, logKey, logEdit); 1538 } 1539 }); 1540 } 1541 1542 /** 1543 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1544 * with something that doesn't expose IntefaceAudience.Private classes. 1545 */ 1546 @Deprecated 1547 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1548 throws IOException { 1549 execOperation(coprocEnvironments.isEmpty()? null: 1550 new RegionObserverOperationWithoutResult() { 1551 @Override 1552 public void call(RegionObserver observer) throws IOException { 1553 observer.postWALRestore(this, info, logKey, logEdit); 1554 } 1555 }); 1556 } 1557 1558 /** 1559 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1560 */ 1561 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1562 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1563 @Override 1564 public void call(RegionObserver observer) throws IOException { 1565 observer.preBulkLoadHFile(this, familyPaths); 1566 } 1567 }); 1568 } 1569 1570 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1571 throws IOException { 1572 return execOperation(coprocEnvironments.isEmpty()? null: 1573 new RegionObserverOperationWithoutResult() { 1574 @Override 1575 public void call(RegionObserver observer) throws IOException { 1576 observer.preCommitStoreFile(this, family, pairs); 1577 } 1578 }); 1579 } 1580 1581 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { 1582 execOperation(coprocEnvironments.isEmpty()? null: 1583 new RegionObserverOperationWithoutResult() { 1584 @Override 1585 public void call(RegionObserver observer) throws IOException { 1586 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1587 } 1588 }); 1589 } 1590 1591 /** 1592 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1593 * @param map Map of CF to List of file paths for the final loaded files 1594 * @throws IOException 1595 */ 1596 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1597 Map<byte[], List<Path>> map) throws IOException { 1598 if (this.coprocEnvironments.isEmpty()) { 1599 return; 1600 } 1601 execOperation(coprocEnvironments.isEmpty()? null: 1602 new RegionObserverOperationWithoutResult() { 1603 @Override 1604 public void call(RegionObserver observer) throws IOException { 1605 observer.postBulkLoadHFile(this, familyPaths, map); 1606 } 1607 }); 1608 } 1609 1610 public void postStartRegionOperation(final Operation op) throws IOException { 1611 execOperation(coprocEnvironments.isEmpty()? null: 1612 new RegionObserverOperationWithoutResult() { 1613 @Override 1614 public void call(RegionObserver observer) throws IOException { 1615 observer.postStartRegionOperation(this, op); 1616 } 1617 }); 1618 } 1619 1620 public void postCloseRegionOperation(final Operation op) throws IOException { 1621 execOperation(coprocEnvironments.isEmpty()? null: 1622 new RegionObserverOperationWithoutResult() { 1623 @Override 1624 public void call(RegionObserver observer) throws IOException { 1625 observer.postCloseRegionOperation(this, op); 1626 } 1627 }); 1628 } 1629 1630 /** 1631 * @param fs fileystem to read from 1632 * @param p path to the file 1633 * @param in {@link FSDataInputStreamWrapper} 1634 * @param size Full size of the file 1635 * @param cacheConf 1636 * @param r original reference file. This will be not null only when reading a split file. 1637 * @return a Reader instance to use instead of the base reader if overriding 1638 * default behavior, null otherwise 1639 * @throws IOException 1640 */ 1641 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1642 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1643 final Reference r) throws IOException { 1644 if (coprocEnvironments.isEmpty()) { 1645 return null; 1646 } 1647 return execOperationWithResult( 1648 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1649 @Override 1650 public StoreFileReader call(RegionObserver observer) throws IOException { 1651 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1652 getResult()); 1653 } 1654 }); 1655 } 1656 1657 /** 1658 * @param fs fileystem to read from 1659 * @param p path to the file 1660 * @param in {@link FSDataInputStreamWrapper} 1661 * @param size Full size of the file 1662 * @param cacheConf 1663 * @param r original reference file. This will be not null only when reading a split file. 1664 * @param reader the base reader instance 1665 * @return The reader to use 1666 * @throws IOException 1667 */ 1668 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1669 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1670 final Reference r, final StoreFileReader reader) throws IOException { 1671 if (this.coprocEnvironments.isEmpty()) { 1672 return reader; 1673 } 1674 return execOperationWithResult( 1675 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) { 1676 @Override 1677 public StoreFileReader call(RegionObserver observer) throws IOException { 1678 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1679 getResult()); 1680 } 1681 }); 1682 } 1683 1684 public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, 1685 final Cell oldCell, Cell newCell) throws IOException { 1686 if (this.coprocEnvironments.isEmpty()) { 1687 return newCell; 1688 } 1689 return execOperationWithResult( 1690 new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) { 1691 @Override 1692 public Cell call(RegionObserver observer) throws IOException { 1693 return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); 1694 } 1695 }); 1696 } 1697 1698 public Message preEndpointInvocation(final Service service, final String methodName, 1699 Message request) throws IOException { 1700 if (coprocEnvironments.isEmpty()) { 1701 return request; 1702 } 1703 return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver, 1704 Message>(endpointObserverGetter, request) { 1705 @Override 1706 public Message call(EndpointObserver observer) throws IOException { 1707 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1708 } 1709 }); 1710 } 1711 1712 public void postEndpointInvocation(final Service service, final String methodName, 1713 final Message request, final Message.Builder responseBuilder) throws IOException { 1714 execOperation(coprocEnvironments.isEmpty() ? null : 1715 new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1716 @Override 1717 public void call(EndpointObserver observer) throws IOException { 1718 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1719 } 1720 }); 1721 } 1722 1723 /** 1724 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1725 */ 1726 @Deprecated 1727 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1728 if (this.coprocEnvironments.isEmpty()) { 1729 return result; 1730 } 1731 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>( 1732 regionObserverGetter, result) { 1733 @Override 1734 public DeleteTracker call(RegionObserver observer) throws IOException { 1735 return observer.postInstantiateDeleteTracker(this, getResult()); 1736 } 1737 }); 1738 } 1739 1740 ///////////////////////////////////////////////////////////////////////////////////////////////// 1741 // BulkLoadObserver hooks 1742 ///////////////////////////////////////////////////////////////////////////////////////////////// 1743 public void prePrepareBulkLoad(User user) throws IOException { 1744 execOperation(coprocEnvironments.isEmpty() ? null : 1745 new BulkLoadObserverOperation(user) { 1746 @Override protected void call(BulkLoadObserver observer) throws IOException { 1747 observer.prePrepareBulkLoad(this); 1748 } 1749 }); 1750 } 1751 1752 public void preCleanupBulkLoad(User user) throws IOException { 1753 execOperation(coprocEnvironments.isEmpty() ? null : 1754 new BulkLoadObserverOperation(user) { 1755 @Override protected void call(BulkLoadObserver observer) throws IOException { 1756 observer.preCleanupBulkLoad(this); 1757 } 1758 }); 1759 } 1760}