001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.protobuf.Message; 021import com.google.protobuf.Service; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.RawCellBuilder; 038import org.apache.hadoop.hbase.RawCellBuilderFactory; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.SharedConnection; 041import org.apache.hadoop.hbase.client.Append; 042import org.apache.hadoop.hbase.client.CheckAndMutate; 043import org.apache.hadoop.hbase.client.CheckAndMutateResult; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Delete; 046import org.apache.hadoop.hbase.client.Get; 047import org.apache.hadoop.hbase.client.Increment; 048import org.apache.hadoop.hbase.client.Mutation; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.TableDescriptor; 054import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 055import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 056import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorService; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 060import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 062import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 063import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 064import org.apache.hadoop.hbase.coprocessor.ObserverContext; 065import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 066import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 067import org.apache.hadoop.hbase.coprocessor.RegionObserver; 068import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 069import org.apache.hadoop.hbase.io.Reference; 070import org.apache.hadoop.hbase.io.hfile.CacheConfig; 071import org.apache.hadoop.hbase.metrics.MetricRegistry; 072import org.apache.hadoop.hbase.regionserver.Region.Operation; 073import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 074import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 075import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 076import org.apache.hadoop.hbase.security.User; 077import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALKey; 081import org.apache.yetus.audience.InterfaceAudience; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 086import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 087 088/** 089 * Implements the coprocessor environment and runtime support for coprocessors loaded within a 090 * {@link Region}. 091 */ 092@InterfaceAudience.Private 093public class RegionCoprocessorHost 094 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 095 096 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 097 // The shared data map 098 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 099 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 100 AbstractReferenceMap.ReferenceStrength.WEAK); 101 102 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 103 private final boolean hasCustomPostScannerFilterRow; 104 105 /* 106 * Whether any configured CPs override postScannerFilterRow hook 107 */ 108 public boolean hasCustomPostScannerFilterRow() { 109 return hasCustomPostScannerFilterRow; 110 } 111 112 /** 113 * Encapsulation of the environment of each coprocessor 114 */ 115 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 116 implements RegionCoprocessorEnvironment { 117 private Region region; 118 ConcurrentMap<String, Object> sharedData; 119 private final MetricRegistry metricRegistry; 120 private final RegionServerServices services; 121 122 /** 123 * Constructor 124 * @param impl the coprocessor instance 125 * @param priority chaining priority 126 */ 127 public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq, 128 final Configuration conf, final Region region, final RegionServerServices services, 129 final ConcurrentMap<String, Object> sharedData) { 130 super(impl, priority, seq, conf); 131 this.region = region; 132 this.sharedData = sharedData; 133 this.services = services; 134 this.metricRegistry = 135 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 136 } 137 138 /** Returns the region */ 139 @Override 140 public Region getRegion() { 141 return region; 142 } 143 144 @Override 145 public OnlineRegions getOnlineRegions() { 146 return this.services; 147 } 148 149 @Override 150 public Connection getConnection() { 151 // Mocks may have services as null at test time. 152 return services != null ? new SharedConnection(services.getConnection()) : null; 153 } 154 155 @Override 156 public Connection createConnection(Configuration conf) throws IOException { 157 return services != null ? this.services.createConnection(conf) : null; 158 } 159 160 @Override 161 public ServerName getServerName() { 162 return services != null ? services.getServerName() : null; 163 } 164 165 @Override 166 public void shutdown() { 167 super.shutdown(); 168 MetricsCoprocessor.removeRegistry(this.metricRegistry); 169 } 170 171 @Override 172 public ConcurrentMap<String, Object> getSharedData() { 173 return sharedData; 174 } 175 176 @Override 177 public RegionInfo getRegionInfo() { 178 return region.getRegionInfo(); 179 } 180 181 @Override 182 public MetricRegistry getMetricRegistryForRegionServer() { 183 return metricRegistry; 184 } 185 186 @Override 187 public RawCellBuilder getCellBuilder() { 188 // We always do a DEEP_COPY only 189 return RawCellBuilderFactory.create(); 190 } 191 } 192 193 /** 194 * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors 195 * only. Temporary hack until Core Coprocessors are integrated into Core. 196 */ 197 private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment 198 implements HasRegionServerServices { 199 private final RegionServerServices rsServices; 200 201 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 202 final int seq, final Configuration conf, final Region region, 203 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 204 super(impl, priority, seq, conf, region, services, sharedData); 205 this.rsServices = services; 206 } 207 208 /** 209 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 210 * consumption. 211 */ 212 @Override 213 public RegionServerServices getRegionServerServices() { 214 return this.rsServices; 215 } 216 } 217 218 static class TableCoprocessorAttribute { 219 private Path path; 220 private String className; 221 private int priority; 222 private Configuration conf; 223 224 public TableCoprocessorAttribute(Path path, String className, int priority, 225 Configuration conf) { 226 this.path = path; 227 this.className = className; 228 this.priority = priority; 229 this.conf = conf; 230 } 231 232 public Path getPath() { 233 return path; 234 } 235 236 public String getClassName() { 237 return className; 238 } 239 240 public int getPriority() { 241 return priority; 242 } 243 244 public Configuration getConf() { 245 return conf; 246 } 247 } 248 249 /** The region server services */ 250 RegionServerServices rsServices; 251 /** The region */ 252 HRegion region; 253 254 /** 255 * Constructor 256 * @param region the region 257 * @param rsServices interface to available region server functionality 258 * @param conf the configuration 259 */ 260 @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization 261 public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices, 262 final Configuration conf) { 263 super(rsServices); 264 this.conf = conf; 265 this.rsServices = rsServices; 266 this.region = region; 267 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 268 269 // load system default cp's from configuration. 270 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 271 272 // load system default cp's for user tables from configuration. 273 if (!region.getRegionInfo().getTable().isSystemTable()) { 274 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 275 } 276 277 // load Coprocessor From HDFS 278 loadTableCoprocessors(conf); 279 280 // now check whether any coprocessor implements postScannerFilterRow 281 boolean hasCustomPostScannerFilterRow = false; 282 out: for (RegionCoprocessorEnvironment env : coprocEnvironments) { 283 if (env.getInstance() instanceof RegionObserver) { 284 Class<?> clazz = env.getInstance().getClass(); 285 for (;;) { 286 if (clazz == Object.class) { 287 // we dont need to look postScannerFilterRow into Object class 288 break; // break the inner loop 289 } 290 try { 291 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 292 InternalScanner.class, Cell.class, boolean.class); 293 // this coprocessor has a custom version of postScannerFilterRow 294 hasCustomPostScannerFilterRow = true; 295 break out; 296 } catch (NoSuchMethodException ignore) { 297 } 298 // the deprecated signature still exists 299 try { 300 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 301 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 302 // this coprocessor has a custom version of postScannerFilterRow 303 hasCustomPostScannerFilterRow = true; 304 break out; 305 } catch (NoSuchMethodException ignore) { 306 } 307 clazz = clazz.getSuperclass(); 308 } 309 } 310 } 311 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 312 } 313 314 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 315 TableDescriptor htd) { 316 return htd.getCoprocessorDescriptors().stream().map(cp -> { 317 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 318 Configuration ourConf; 319 if (!cp.getProperties().isEmpty()) { 320 // do an explicit deep copy of the passed configuration 321 ourConf = new Configuration(false); 322 HBaseConfiguration.merge(ourConf, conf); 323 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 324 } else { 325 ourConf = conf; 326 } 327 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 328 }).collect(Collectors.toList()); 329 } 330 331 /** 332 * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception 333 * if there is a problem. nnn 334 */ 335 public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd) 336 throws IOException { 337 String pathPrefix = UUID.randomUUID().toString(); 338 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) { 339 if (attr.getPriority() < 0) { 340 throw new IOException( 341 "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0"); 342 } 343 ClassLoader old = Thread.currentThread().getContextClassLoader(); 344 try { 345 ClassLoader cl; 346 if (attr.getPath() != null) { 347 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 348 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 349 } else { 350 cl = CoprocessorHost.class.getClassLoader(); 351 } 352 Thread.currentThread().setContextClassLoader(cl); 353 if (cl instanceof CoprocessorClassLoader) { 354 String[] includedClassPrefixes = null; 355 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 356 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 357 includedClassPrefixes = prefixes.split(";"); 358 } 359 ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes); 360 } else { 361 cl.loadClass(attr.getClassName()); 362 } 363 } catch (ClassNotFoundException e) { 364 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 365 } finally { 366 Thread.currentThread().setContextClassLoader(old); 367 } 368 } 369 } 370 371 void loadTableCoprocessors(final Configuration conf) { 372 boolean coprocessorsEnabled = 373 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 374 boolean tableCoprocessorsEnabled = 375 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 376 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 377 return; 378 } 379 380 // scan the table attributes for coprocessor load specifications 381 // initialize the coprocessors 382 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 383 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, 384 region.getTableDescriptor())) { 385 // Load encompasses classloading and coprocessor initialization 386 try { 387 RegionCoprocessorEnvironment env = 388 load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf()); 389 if (env == null) { 390 continue; 391 } 392 configured.add(env); 393 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " 394 + region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 395 } catch (Throwable t) { 396 // Coprocessor failed to load, do we abort on error? 397 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 398 abortServer(attr.getClassName(), t); 399 } else { 400 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 401 } 402 } 403 } 404 // add together to coprocessor set for COW efficiency 405 coprocEnvironments.addAll(configured); 406 } 407 408 @Override 409 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 410 Configuration conf) { 411 // If coprocessor exposes any services, register them. 412 for (Service service : instance.getServices()) { 413 region.registerService(service); 414 } 415 ConcurrentMap<String, Object> classData; 416 // make sure only one thread can add maps 417 synchronized (SHARED_DATA_MAP) { 418 // as long as at least one RegionEnvironment holds on to its classData it will 419 // remain in this map 420 classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 421 k -> new ConcurrentHashMap<>()); 422 } 423 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 424 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 425 ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices, 426 classData) 427 : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 428 } 429 430 @Override 431 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 432 throws InstantiationException, IllegalAccessException { 433 try { 434 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 435 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 436 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { 437 // For backward compatibility with old CoprocessorService impl which don't extend 438 // RegionCoprocessor. 439 CoprocessorService cs; 440 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); 441 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); 442 } else { 443 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 444 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 445 return null; 446 } 447 } catch (NoSuchMethodException | InvocationTargetException e) { 448 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 449 } 450 } 451 452 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 453 RegionCoprocessor::getRegionObserver; 454 455 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 456 RegionCoprocessor::getEndpointObserver; 457 458 abstract class RegionObserverOperationWithoutResult 459 extends ObserverOperationWithoutResult<RegionObserver> { 460 public RegionObserverOperationWithoutResult() { 461 super(regionObserverGetter); 462 } 463 464 public RegionObserverOperationWithoutResult(User user) { 465 super(regionObserverGetter, user); 466 } 467 468 public RegionObserverOperationWithoutResult(boolean bypassable) { 469 super(regionObserverGetter, null, bypassable); 470 } 471 472 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 473 super(regionObserverGetter, user, bypassable); 474 } 475 } 476 477 abstract class BulkLoadObserverOperation 478 extends ObserverOperationWithoutResult<BulkLoadObserver> { 479 public BulkLoadObserverOperation(User user) { 480 super(RegionCoprocessor::getBulkLoadObserver, user); 481 } 482 } 483 484 ////////////////////////////////////////////////////////////////////////////////////////////////// 485 // Observer operations 486 ////////////////////////////////////////////////////////////////////////////////////////////////// 487 488 ////////////////////////////////////////////////////////////////////////////////////////////////// 489 // Observer operations 490 ////////////////////////////////////////////////////////////////////////////////////////////////// 491 492 /** 493 * Invoked before a region open. 494 * @throws IOException Signals that an I/O exception has occurred. 495 */ 496 public void preOpen() throws IOException { 497 if (coprocEnvironments.isEmpty()) { 498 return; 499 } 500 execOperation(new RegionObserverOperationWithoutResult() { 501 @Override 502 public void call(RegionObserver observer) throws IOException { 503 observer.preOpen(this); 504 } 505 }); 506 } 507 508 /** 509 * Invoked after a region open 510 */ 511 public void postOpen() { 512 if (coprocEnvironments.isEmpty()) { 513 return; 514 } 515 try { 516 execOperation(new RegionObserverOperationWithoutResult() { 517 @Override 518 public void call(RegionObserver observer) throws IOException { 519 observer.postOpen(this); 520 } 521 }); 522 } catch (IOException e) { 523 LOG.warn(e.toString(), e); 524 } 525 } 526 527 /** 528 * Invoked before a region is closed 529 * @param abortRequested true if the server is aborting 530 */ 531 public void preClose(final boolean abortRequested) throws IOException { 532 execOperation(new RegionObserverOperationWithoutResult() { 533 @Override 534 public void call(RegionObserver observer) throws IOException { 535 observer.preClose(this, abortRequested); 536 } 537 }); 538 } 539 540 /** 541 * Invoked after a region is closed 542 * @param abortRequested true if the server is aborting 543 */ 544 public void postClose(final boolean abortRequested) { 545 try { 546 execOperation(new RegionObserverOperationWithoutResult() { 547 @Override 548 public void call(RegionObserver observer) throws IOException { 549 observer.postClose(this, abortRequested); 550 } 551 552 @Override 553 public void postEnvCall() { 554 shutdown(this.getEnvironment()); 555 } 556 }); 557 } catch (IOException e) { 558 LOG.warn(e.toString(), e); 559 } 560 } 561 562 /** 563 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 564 * available candidates. 565 * <p> 566 * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the 567 * passed in <code>candidates</code>. 568 * @param store The store where compaction is being requested 569 * @param candidates The currently available store files 570 * @param tracker used to track the life cycle of a compaction 571 * @param user the user n 572 */ 573 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 574 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 575 if (coprocEnvironments.isEmpty()) { 576 return false; 577 } 578 boolean bypassable = true; 579 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 580 @Override 581 public void call(RegionObserver observer) throws IOException { 582 observer.preCompactSelection(this, store, candidates, tracker); 583 } 584 }); 585 } 586 587 /** 588 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 589 * candidates. 590 * @param store The store where compaction is being requested 591 * @param selected The store files selected to compact 592 * @param tracker used to track the life cycle of a compaction 593 * @param request the compaction request 594 * @param user the user 595 */ 596 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 597 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 598 throws IOException { 599 if (coprocEnvironments.isEmpty()) { 600 return; 601 } 602 execOperation(new RegionObserverOperationWithoutResult(user) { 603 @Override 604 public void call(RegionObserver observer) throws IOException { 605 observer.postCompactSelection(this, store, selected, tracker, request); 606 } 607 }); 608 } 609 610 /** 611 * Called prior to opening store scanner for compaction. 612 */ 613 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 614 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 615 if (coprocEnvironments.isEmpty()) { 616 return store.getScanInfo(); 617 } 618 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 619 execOperation(new RegionObserverOperationWithoutResult(user) { 620 @Override 621 public void call(RegionObserver observer) throws IOException { 622 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 623 } 624 }); 625 return builder.build(); 626 } 627 628 /** 629 * Called prior to rewriting the store files selected for compaction 630 * @param store the store being compacted 631 * @param scanner the scanner used to read store data during compaction 632 * @param scanType type of Scan 633 * @param tracker used to track the life cycle of a compaction 634 * @param request the compaction request 635 * @param user the user 636 * @return Scanner to use (cannot be null!) n 637 */ 638 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 639 final ScanType scanType, final CompactionLifeCycleTracker tracker, 640 final CompactionRequest request, final User user) throws IOException { 641 InternalScanner defaultResult = scanner; 642 if (coprocEnvironments.isEmpty()) { 643 return defaultResult; 644 } 645 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 646 regionObserverGetter, defaultResult, user) { 647 @Override 648 public InternalScanner call(RegionObserver observer) throws IOException { 649 InternalScanner scanner = 650 observer.preCompact(this, store, getResult(), scanType, tracker, request); 651 if (scanner == null) { 652 throw new CoprocessorException("Null Scanner return disallowed!"); 653 } 654 return scanner; 655 } 656 }); 657 } 658 659 /** 660 * Called after the store compaction has completed. 661 * @param store the store being compacted 662 * @param resultFile the new store file written during compaction 663 * @param tracker used to track the life cycle of a compaction 664 * @param request the compaction request 665 * @param user the user n 666 */ 667 public void postCompact(final HStore store, final HStoreFile resultFile, 668 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 669 throws IOException { 670 execOperation( 671 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) { 672 @Override 673 public void call(RegionObserver observer) throws IOException { 674 observer.postCompact(this, store, resultFile, tracker, request); 675 } 676 }); 677 } 678 679 /** 680 * Invoked before create StoreScanner for flush. 681 */ 682 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 683 throws IOException { 684 if (coprocEnvironments.isEmpty()) { 685 return store.getScanInfo(); 686 } 687 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 688 execOperation(new RegionObserverOperationWithoutResult() { 689 @Override 690 public void call(RegionObserver observer) throws IOException { 691 observer.preFlushScannerOpen(this, store, builder, tracker); 692 } 693 }); 694 return builder.build(); 695 } 696 697 /** 698 * Invoked before a memstore flush 699 * @return Scanner to use (cannot be null!) n 700 */ 701 public InternalScanner preFlush(HStore store, InternalScanner scanner, 702 FlushLifeCycleTracker tracker) throws IOException { 703 if (coprocEnvironments.isEmpty()) { 704 return scanner; 705 } 706 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 707 regionObserverGetter, scanner) { 708 @Override 709 public InternalScanner call(RegionObserver observer) throws IOException { 710 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 711 if (scanner == null) { 712 throw new CoprocessorException("Null Scanner return disallowed!"); 713 } 714 return scanner; 715 } 716 }); 717 } 718 719 /** 720 * Invoked before a memstore flush n 721 */ 722 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 723 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 724 @Override 725 public void call(RegionObserver observer) throws IOException { 726 observer.preFlush(this, tracker); 727 } 728 }); 729 } 730 731 /** 732 * Invoked after a memstore flush n 733 */ 734 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 735 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 736 @Override 737 public void call(RegionObserver observer) throws IOException { 738 observer.postFlush(this, tracker); 739 } 740 }); 741 } 742 743 /** 744 * Invoked before in memory compaction. 745 */ 746 public void preMemStoreCompaction(HStore store) throws IOException { 747 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 748 @Override 749 public void call(RegionObserver observer) throws IOException { 750 observer.preMemStoreCompaction(this, store); 751 } 752 }); 753 } 754 755 /** 756 * Invoked before create StoreScanner for in memory compaction. 757 */ 758 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 759 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 760 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 761 @Override 762 public void call(RegionObserver observer) throws IOException { 763 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 764 } 765 }); 766 return builder.build(); 767 } 768 769 /** 770 * Invoked before compacting memstore. 771 */ 772 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 773 throws IOException { 774 if (coprocEnvironments.isEmpty()) { 775 return scanner; 776 } 777 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 778 regionObserverGetter, scanner) { 779 @Override 780 public InternalScanner call(RegionObserver observer) throws IOException { 781 return observer.preMemStoreCompactionCompact(this, store, getResult()); 782 } 783 }); 784 } 785 786 /** 787 * Invoked after in memory compaction. 788 */ 789 public void postMemStoreCompaction(HStore store) throws IOException { 790 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 791 @Override 792 public void call(RegionObserver observer) throws IOException { 793 observer.postMemStoreCompaction(this, store); 794 } 795 }); 796 } 797 798 /** 799 * Invoked after a memstore flush n 800 */ 801 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 802 throws IOException { 803 if (coprocEnvironments.isEmpty()) { 804 return; 805 } 806 execOperation(new RegionObserverOperationWithoutResult() { 807 @Override 808 public void call(RegionObserver observer) throws IOException { 809 observer.postFlush(this, store, storeFile, tracker); 810 } 811 }); 812 } 813 814 // RegionObserver support 815 /** 816 * Supports Coprocessor 'bypass'. 817 * @param get the Get request 818 * @param results What to return if return is true/'bypass'. 819 * @return true if default processing should be bypassed. 820 * @exception IOException Exception 821 */ 822 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 823 if (coprocEnvironments.isEmpty()) { 824 return false; 825 } 826 boolean bypassable = true; 827 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 828 @Override 829 public void call(RegionObserver observer) throws IOException { 830 observer.preGetOp(this, get, results); 831 } 832 }); 833 } 834 835 /** 836 * @param get the Get request 837 * @param results the result set 838 * @exception IOException Exception 839 */ 840 public void postGet(final Get get, final List<Cell> results) throws IOException { 841 if (coprocEnvironments.isEmpty()) { 842 return; 843 } 844 execOperation(new RegionObserverOperationWithoutResult() { 845 @Override 846 public void call(RegionObserver observer) throws IOException { 847 observer.postGetOp(this, get, results); 848 } 849 }); 850 } 851 852 /** 853 * Supports Coprocessor 'bypass'. 854 * @param get the Get request 855 * @return true or false to return to client if bypassing normal operation, or null otherwise 856 * @exception IOException Exception 857 */ 858 public Boolean preExists(final Get get) throws IOException { 859 boolean bypassable = true; 860 boolean defaultResult = false; 861 if (coprocEnvironments.isEmpty()) { 862 return null; 863 } 864 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 865 regionObserverGetter, defaultResult, bypassable) { 866 @Override 867 public Boolean call(RegionObserver observer) throws IOException { 868 return observer.preExists(this, get, getResult()); 869 } 870 }); 871 } 872 873 /** 874 * @param get the Get request 875 * @param result the result returned by the region server 876 * @return the result to return to the client 877 * @exception IOException Exception 878 */ 879 public boolean postExists(final Get get, boolean result) throws IOException { 880 if (this.coprocEnvironments.isEmpty()) { 881 return result; 882 } 883 return execOperationWithResult( 884 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 885 @Override 886 public Boolean call(RegionObserver observer) throws IOException { 887 return observer.postExists(this, get, getResult()); 888 } 889 }); 890 } 891 892 /** 893 * Supports Coprocessor 'bypass'. 894 * @param put The Put object 895 * @param edit The WALEdit object. 896 * @return true if default processing should be bypassed 897 * @exception IOException Exception 898 */ 899 public boolean prePut(final Put put, final WALEdit edit) throws IOException { 900 if (coprocEnvironments.isEmpty()) { 901 return false; 902 } 903 boolean bypassable = true; 904 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 905 @Override 906 public void call(RegionObserver observer) throws IOException { 907 observer.prePut(this, put, edit); 908 } 909 }); 910 } 911 912 /** 913 * Supports Coprocessor 'bypass'. 914 * @param mutation - the current mutation 915 * @param kv - the current cell 916 * @param byteNow - current timestamp in bytes 917 * @param get - the get that could be used Note that the get only does not specify the family 918 * and qualifier that should be used 919 * @return true if default processing should be bypassed 920 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 921 * Coprocessor for its needs only. Will be removed. 922 */ 923 @Deprecated 924 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, 925 final byte[] byteNow, final Get get) throws IOException { 926 if (coprocEnvironments.isEmpty()) { 927 return false; 928 } 929 boolean bypassable = true; 930 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 931 @Override 932 public void call(RegionObserver observer) throws IOException { 933 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 934 } 935 }); 936 } 937 938 /** 939 * @param put The Put object 940 * @param edit The WALEdit object. 941 * @exception IOException Exception 942 */ 943 public void postPut(final Put put, final WALEdit edit) throws IOException { 944 if (coprocEnvironments.isEmpty()) { 945 return; 946 } 947 execOperation(new RegionObserverOperationWithoutResult() { 948 @Override 949 public void call(RegionObserver observer) throws IOException { 950 observer.postPut(this, put, edit); 951 } 952 }); 953 } 954 955 /** 956 * Supports Coprocessor 'bypass'. 957 * @param delete The Delete object 958 * @param edit The WALEdit object. 959 * @return true if default processing should be bypassed 960 * @exception IOException Exception 961 */ 962 public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException { 963 if (this.coprocEnvironments.isEmpty()) { 964 return false; 965 } 966 boolean bypassable = true; 967 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 968 @Override 969 public void call(RegionObserver observer) throws IOException { 970 observer.preDelete(this, delete, edit); 971 } 972 }); 973 } 974 975 /** 976 * @param delete The Delete object 977 * @param edit The WALEdit object. 978 * @exception IOException Exception 979 */ 980 public void postDelete(final Delete delete, final WALEdit edit) throws IOException { 981 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 982 @Override 983 public void call(RegionObserver observer) throws IOException { 984 observer.postDelete(this, delete, edit); 985 } 986 }); 987 } 988 989 public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 990 throws IOException { 991 if (this.coprocEnvironments.isEmpty()) { 992 return; 993 } 994 execOperation(new RegionObserverOperationWithoutResult() { 995 @Override 996 public void call(RegionObserver observer) throws IOException { 997 observer.preBatchMutate(this, miniBatchOp); 998 } 999 }); 1000 } 1001 1002 public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 1003 throws IOException { 1004 if (this.coprocEnvironments.isEmpty()) { 1005 return; 1006 } 1007 execOperation(new RegionObserverOperationWithoutResult() { 1008 @Override 1009 public void call(RegionObserver observer) throws IOException { 1010 observer.postBatchMutate(this, miniBatchOp); 1011 } 1012 }); 1013 } 1014 1015 public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp, 1016 final boolean success) throws IOException { 1017 if (this.coprocEnvironments.isEmpty()) { 1018 return; 1019 } 1020 execOperation(new RegionObserverOperationWithoutResult() { 1021 @Override 1022 public void call(RegionObserver observer) throws IOException { 1023 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1024 } 1025 }); 1026 } 1027 1028 /** 1029 * Supports Coprocessor 'bypass'. 1030 * @param checkAndMutate the CheckAndMutate object 1031 * @return true or false to return to client if default processing should be bypassed, or null 1032 * otherwise 1033 * @throws IOException if an error occurred on the coprocessor 1034 */ 1035 public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException { 1036 boolean bypassable = true; 1037 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1038 if (coprocEnvironments.isEmpty()) { 1039 return null; 1040 } 1041 return execOperationWithResult( 1042 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1043 defaultResult, bypassable) { 1044 @Override 1045 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1046 return observer.preCheckAndMutate(this, checkAndMutate, getResult()); 1047 } 1048 }); 1049 } 1050 1051 /** 1052 * Supports Coprocessor 'bypass'. 1053 * @param checkAndMutate the CheckAndMutate object 1054 * @return true or false to return to client if default processing should be bypassed, or null 1055 * otherwise 1056 * @throws IOException if an error occurred on the coprocessor 1057 */ 1058 public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) 1059 throws IOException { 1060 boolean bypassable = true; 1061 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1062 if (coprocEnvironments.isEmpty()) { 1063 return null; 1064 } 1065 return execOperationWithResult( 1066 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1067 defaultResult, bypassable) { 1068 @Override 1069 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1070 return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); 1071 } 1072 }); 1073 } 1074 1075 /** 1076 * @param checkAndMutate the CheckAndMutate object 1077 * @param result the result returned by the checkAndMutate 1078 * @return true or false to return to client if default processing should be bypassed, or null 1079 * otherwise 1080 * @throws IOException if an error occurred on the coprocessor 1081 */ 1082 public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, 1083 CheckAndMutateResult result) throws IOException { 1084 if (this.coprocEnvironments.isEmpty()) { 1085 return result; 1086 } 1087 return execOperationWithResult( 1088 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1089 result) { 1090 @Override 1091 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1092 return observer.postCheckAndMutate(this, checkAndMutate, getResult()); 1093 } 1094 }); 1095 } 1096 1097 /** 1098 * Supports Coprocessor 'bypass'. 1099 * @param append append object 1100 * @param edit The WALEdit object. 1101 * @return result to return to client if default operation should be bypassed, null otherwise 1102 * @throws IOException if an error occurred on the coprocessor 1103 */ 1104 public Result preAppend(final Append append, final WALEdit edit) throws IOException { 1105 boolean bypassable = true; 1106 Result defaultResult = null; 1107 if (this.coprocEnvironments.isEmpty()) { 1108 return defaultResult; 1109 } 1110 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1111 regionObserverGetter, defaultResult, bypassable) { 1112 @Override 1113 public Result call(RegionObserver observer) throws IOException { 1114 return observer.preAppend(this, append, edit); 1115 } 1116 }); 1117 } 1118 1119 /** 1120 * Supports Coprocessor 'bypass'. 1121 * @param append append object 1122 * @return result to return to client if default operation should be bypassed, null otherwise 1123 * @throws IOException if an error occurred on the coprocessor 1124 */ 1125 public Result preAppendAfterRowLock(final Append append) throws IOException { 1126 boolean bypassable = true; 1127 Result defaultResult = null; 1128 if (this.coprocEnvironments.isEmpty()) { 1129 return defaultResult; 1130 } 1131 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1132 regionObserverGetter, defaultResult, bypassable) { 1133 @Override 1134 public Result call(RegionObserver observer) throws IOException { 1135 return observer.preAppendAfterRowLock(this, append); 1136 } 1137 }); 1138 } 1139 1140 /** 1141 * Supports Coprocessor 'bypass'. 1142 * @param increment increment object 1143 * @param edit The WALEdit object. 1144 * @return result to return to client if default operation should be bypassed, null otherwise 1145 * @throws IOException if an error occurred on the coprocessor 1146 */ 1147 public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException { 1148 boolean bypassable = true; 1149 Result defaultResult = null; 1150 if (coprocEnvironments.isEmpty()) { 1151 return defaultResult; 1152 } 1153 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1154 regionObserverGetter, defaultResult, bypassable) { 1155 @Override 1156 public Result call(RegionObserver observer) throws IOException { 1157 return observer.preIncrement(this, increment, edit); 1158 } 1159 }); 1160 } 1161 1162 /** 1163 * Supports Coprocessor 'bypass'. 1164 * @param increment increment object 1165 * @return result to return to client if default operation should be bypassed, null otherwise 1166 * @throws IOException if an error occurred on the coprocessor 1167 */ 1168 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1169 boolean bypassable = true; 1170 Result defaultResult = null; 1171 if (coprocEnvironments.isEmpty()) { 1172 return defaultResult; 1173 } 1174 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1175 regionObserverGetter, defaultResult, bypassable) { 1176 @Override 1177 public Result call(RegionObserver observer) throws IOException { 1178 return observer.preIncrementAfterRowLock(this, increment); 1179 } 1180 }); 1181 } 1182 1183 /** 1184 * @param append Append object 1185 * @param result the result returned by the append 1186 * @param edit The WALEdit object. 1187 * @throws IOException if an error occurred on the coprocessor 1188 */ 1189 public Result postAppend(final Append append, final Result result, final WALEdit edit) 1190 throws IOException { 1191 if (this.coprocEnvironments.isEmpty()) { 1192 return result; 1193 } 1194 return execOperationWithResult( 1195 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1196 @Override 1197 public Result call(RegionObserver observer) throws IOException { 1198 return observer.postAppend(this, append, result, edit); 1199 } 1200 }); 1201 } 1202 1203 /** 1204 * @param increment increment object 1205 * @param result the result returned by postIncrement 1206 * @param edit The WALEdit object. 1207 * @throws IOException if an error occurred on the coprocessor 1208 */ 1209 public Result postIncrement(final Increment increment, Result result, final WALEdit edit) 1210 throws IOException { 1211 if (this.coprocEnvironments.isEmpty()) { 1212 return result; 1213 } 1214 return execOperationWithResult( 1215 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1216 @Override 1217 public Result call(RegionObserver observer) throws IOException { 1218 return observer.postIncrement(this, increment, getResult(), edit); 1219 } 1220 }); 1221 } 1222 1223 /** 1224 * @param scan the Scan specification 1225 * @exception IOException Exception 1226 */ 1227 public void preScannerOpen(final Scan scan) throws IOException { 1228 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1229 @Override 1230 public void call(RegionObserver observer) throws IOException { 1231 observer.preScannerOpen(this, scan); 1232 } 1233 }); 1234 } 1235 1236 /** 1237 * @param scan the Scan specification 1238 * @param s the scanner 1239 * @return the scanner instance to use 1240 * @exception IOException Exception 1241 */ 1242 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1243 if (this.coprocEnvironments.isEmpty()) { 1244 return s; 1245 } 1246 return execOperationWithResult( 1247 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1248 @Override 1249 public RegionScanner call(RegionObserver observer) throws IOException { 1250 return observer.postScannerOpen(this, scan, getResult()); 1251 } 1252 }); 1253 } 1254 1255 /** 1256 * @param s the scanner 1257 * @param results the result set returned by the region server 1258 * @param limit the maximum number of results to return 1259 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1260 * @exception IOException Exception 1261 */ 1262 public Boolean preScannerNext(final InternalScanner s, final List<Result> results, 1263 final int limit) throws IOException { 1264 boolean bypassable = true; 1265 boolean defaultResult = false; 1266 if (coprocEnvironments.isEmpty()) { 1267 return null; 1268 } 1269 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1270 regionObserverGetter, defaultResult, bypassable) { 1271 @Override 1272 public Boolean call(RegionObserver observer) throws IOException { 1273 return observer.preScannerNext(this, s, results, limit, getResult()); 1274 } 1275 }); 1276 } 1277 1278 /** 1279 * @param s the scanner 1280 * @param results the result set returned by the region server 1281 * @param limit the maximum number of results to return n * @return 'has more' indication to 1282 * give to client 1283 * @exception IOException Exception 1284 */ 1285 public boolean postScannerNext(final InternalScanner s, final List<Result> results, 1286 final int limit, boolean hasMore) throws IOException { 1287 if (this.coprocEnvironments.isEmpty()) { 1288 return hasMore; 1289 } 1290 return execOperationWithResult( 1291 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1292 @Override 1293 public Boolean call(RegionObserver observer) throws IOException { 1294 return observer.postScannerNext(this, s, results, limit, getResult()); 1295 } 1296 }); 1297 } 1298 1299 /** 1300 * This will be called by the scan flow when the current scanned row is being filtered out by the 1301 * filter. 1302 * @param s the scanner 1303 * @param curRowCell The cell in the current row which got filtered out 1304 * @return whether more rows are available for the scanner or not n 1305 */ 1306 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1307 throws IOException { 1308 // short circuit for performance 1309 boolean defaultResult = true; 1310 if (!hasCustomPostScannerFilterRow) { 1311 return defaultResult; 1312 } 1313 if (this.coprocEnvironments.isEmpty()) { 1314 return defaultResult; 1315 } 1316 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1317 regionObserverGetter, defaultResult) { 1318 @Override 1319 public Boolean call(RegionObserver observer) throws IOException { 1320 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1321 } 1322 }); 1323 } 1324 1325 /** 1326 * Supports Coprocessor 'bypass'. 1327 * @param s the scanner 1328 * @return true if default behavior should be bypassed, false otherwise 1329 * @exception IOException Exception 1330 */ 1331 // Should this be bypassable? 1332 public boolean preScannerClose(final InternalScanner s) throws IOException { 1333 return execOperation( 1334 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1335 @Override 1336 public void call(RegionObserver observer) throws IOException { 1337 observer.preScannerClose(this, s); 1338 } 1339 }); 1340 } 1341 1342 /** 1343 * @exception IOException Exception 1344 */ 1345 public void postScannerClose(final InternalScanner s) throws IOException { 1346 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1347 @Override 1348 public void call(RegionObserver observer) throws IOException { 1349 observer.postScannerClose(this, s); 1350 } 1351 }); 1352 } 1353 1354 /** 1355 * Called before open store scanner for user scan. 1356 */ 1357 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1358 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1359 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1360 execOperation(new RegionObserverOperationWithoutResult() { 1361 @Override 1362 public void call(RegionObserver observer) throws IOException { 1363 observer.preStoreScannerOpen(this, store, builder); 1364 } 1365 }); 1366 return builder.build(); 1367 } 1368 1369 /** 1370 * @param info the RegionInfo for this region 1371 * @param edits the file of recovered edits 1372 */ 1373 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1374 execOperation( 1375 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1376 @Override 1377 public void call(RegionObserver observer) throws IOException { 1378 observer.preReplayWALs(this, info, edits); 1379 } 1380 }); 1381 } 1382 1383 /** 1384 * @param info the RegionInfo for this region 1385 * @param edits the file of recovered edits 1386 * @throws IOException Exception 1387 */ 1388 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1389 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1390 @Override 1391 public void call(RegionObserver observer) throws IOException { 1392 observer.postReplayWALs(this, info, edits); 1393 } 1394 }); 1395 } 1396 1397 /** 1398 * Supports Coprocessor 'bypass'. 1399 * @return true if default behavior should be bypassed, false otherwise 1400 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with 1401 * something that doesn't expose IntefaceAudience.Private classes. 1402 */ 1403 @Deprecated 1404 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1405 throws IOException { 1406 return execOperation( 1407 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1408 @Override 1409 public void call(RegionObserver observer) throws IOException { 1410 observer.preWALRestore(this, info, logKey, logEdit); 1411 } 1412 }); 1413 } 1414 1415 /** 1416 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with 1417 * something that doesn't expose IntefaceAudience.Private classes. 1418 */ 1419 @Deprecated 1420 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1421 throws IOException { 1422 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1423 @Override 1424 public void call(RegionObserver observer) throws IOException { 1425 observer.postWALRestore(this, info, logKey, logEdit); 1426 } 1427 }); 1428 } 1429 1430 /** 1431 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1432 */ 1433 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1434 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1435 @Override 1436 public void call(RegionObserver observer) throws IOException { 1437 observer.preBulkLoadHFile(this, familyPaths); 1438 } 1439 }); 1440 } 1441 1442 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1443 throws IOException { 1444 return execOperation( 1445 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1446 @Override 1447 public void call(RegionObserver observer) throws IOException { 1448 observer.preCommitStoreFile(this, family, pairs); 1449 } 1450 }); 1451 } 1452 1453 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) 1454 throws IOException { 1455 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1456 @Override 1457 public void call(RegionObserver observer) throws IOException { 1458 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1459 } 1460 }); 1461 } 1462 1463 /** 1464 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1465 * @param map Map of CF to List of file paths for the final loaded files n 1466 */ 1467 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1468 Map<byte[], List<Path>> map) throws IOException { 1469 if (this.coprocEnvironments.isEmpty()) { 1470 return; 1471 } 1472 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1473 @Override 1474 public void call(RegionObserver observer) throws IOException { 1475 observer.postBulkLoadHFile(this, familyPaths, map); 1476 } 1477 }); 1478 } 1479 1480 public void postStartRegionOperation(final Operation op) throws IOException { 1481 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1482 @Override 1483 public void call(RegionObserver observer) throws IOException { 1484 observer.postStartRegionOperation(this, op); 1485 } 1486 }); 1487 } 1488 1489 public void postCloseRegionOperation(final Operation op) throws IOException { 1490 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1491 @Override 1492 public void call(RegionObserver observer) throws IOException { 1493 observer.postCloseRegionOperation(this, op); 1494 } 1495 }); 1496 } 1497 1498 /** 1499 * @param fs fileystem to read from 1500 * @param p path to the file 1501 * @param in {@link FSDataInputStreamWrapper} 1502 * @param size Full size of the file n * @param r original reference file. This will be not null 1503 * only when reading a split file. 1504 * @return a Reader instance to use instead of the base reader if overriding default behavior, 1505 * null otherwise n 1506 */ 1507 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1508 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1509 final Reference r) throws IOException { 1510 if (coprocEnvironments.isEmpty()) { 1511 return null; 1512 } 1513 return execOperationWithResult( 1514 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1515 @Override 1516 public StoreFileReader call(RegionObserver observer) throws IOException { 1517 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1518 } 1519 }); 1520 } 1521 1522 /** 1523 * @param fs fileystem to read from 1524 * @param p path to the file 1525 * @param in {@link FSDataInputStreamWrapper} 1526 * @param size Full size of the file n * @param r original reference file. This will be not null 1527 * only when reading a split file. 1528 * @param reader the base reader instance 1529 * @return The reader to use n 1530 */ 1531 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1532 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1533 final Reference r, final StoreFileReader reader) throws IOException { 1534 if (this.coprocEnvironments.isEmpty()) { 1535 return reader; 1536 } 1537 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>( 1538 regionObserverGetter, reader) { 1539 @Override 1540 public StoreFileReader call(RegionObserver observer) throws IOException { 1541 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1542 } 1543 }); 1544 } 1545 1546 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1547 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1548 if (this.coprocEnvironments.isEmpty()) { 1549 return cellPairs; 1550 } 1551 return execOperationWithResult( 1552 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1553 cellPairs) { 1554 @Override 1555 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1556 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1557 } 1558 }); 1559 } 1560 1561 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1562 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1563 if (this.coprocEnvironments.isEmpty()) { 1564 return cellPairs; 1565 } 1566 return execOperationWithResult( 1567 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1568 cellPairs) { 1569 @Override 1570 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1571 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1572 } 1573 }); 1574 } 1575 1576 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1577 if (this.coprocEnvironments.isEmpty()) { 1578 return; 1579 } 1580 execOperation(new RegionObserverOperationWithoutResult() { 1581 @Override 1582 public void call(RegionObserver observer) throws IOException { 1583 observer.preWALAppend(this, key, edit); 1584 } 1585 }); 1586 } 1587 1588 public Message preEndpointInvocation(final Service service, final String methodName, 1589 Message request) throws IOException { 1590 if (coprocEnvironments.isEmpty()) { 1591 return request; 1592 } 1593 return execOperationWithResult( 1594 new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) { 1595 @Override 1596 public Message call(EndpointObserver observer) throws IOException { 1597 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1598 } 1599 }); 1600 } 1601 1602 public void postEndpointInvocation(final Service service, final String methodName, 1603 final Message request, final Message.Builder responseBuilder) throws IOException { 1604 execOperation(coprocEnvironments.isEmpty() 1605 ? null 1606 : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1607 @Override 1608 public void call(EndpointObserver observer) throws IOException { 1609 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1610 } 1611 }); 1612 } 1613 1614 /** 1615 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1616 */ 1617 @Deprecated 1618 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1619 if (this.coprocEnvironments.isEmpty()) { 1620 return result; 1621 } 1622 return execOperationWithResult( 1623 new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) { 1624 @Override 1625 public DeleteTracker call(RegionObserver observer) throws IOException { 1626 return observer.postInstantiateDeleteTracker(this, getResult()); 1627 } 1628 }); 1629 } 1630 1631 ///////////////////////////////////////////////////////////////////////////////////////////////// 1632 // BulkLoadObserver hooks 1633 ///////////////////////////////////////////////////////////////////////////////////////////////// 1634 public void prePrepareBulkLoad(User user) throws IOException { 1635 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1636 @Override 1637 protected void call(BulkLoadObserver observer) throws IOException { 1638 observer.prePrepareBulkLoad(this); 1639 } 1640 }); 1641 } 1642 1643 public void preCleanupBulkLoad(User user) throws IOException { 1644 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1645 @Override 1646 protected void call(BulkLoadObserver observer) throws IOException { 1647 observer.preCleanupBulkLoad(this); 1648 } 1649 }); 1650 } 1651}