001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam3; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Map; 033import java.util.Optional; 034import java.util.Set; 035import java.util.concurrent.ConcurrentMap; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.Coprocessor; 040import org.apache.hadoop.hbase.CoprocessorEnvironment; 041import org.apache.hadoop.hbase.ExtendedCell; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HTestConst; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.regionserver.ChunkCreator; 054import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 058import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 059import org.apache.hadoop.hbase.regionserver.RegionScanner; 060import org.apache.hadoop.hbase.regionserver.RegionServerServices; 061import org.apache.hadoop.hbase.regionserver.ScanType; 062import org.apache.hadoop.hbase.regionserver.ScannerContext; 063import org.apache.hadoop.hbase.regionserver.Store; 064import org.apache.hadoop.hbase.regionserver.StoreFile; 065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 067import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 068import org.apache.hadoop.hbase.testclassification.MediumTests; 069import org.junit.ClassRule; 070import org.junit.Rule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.rules.TestName; 074import org.mockito.Mockito; 075 076@Category({ CoprocessorTests.class, MediumTests.class }) 077public class TestCoprocessorInterface { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestCoprocessorInterface.class); 082 083 @Rule 084 public TestName name = new TestName(); 085 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 086 static final Path DIR = TEST_UTIL.getDataTestDir(); 087 088 private static class CustomScanner implements RegionScanner { 089 090 private RegionScanner delegate; 091 092 public CustomScanner(RegionScanner delegate) { 093 this.delegate = delegate; 094 } 095 096 @Override 097 public boolean next(List<? super ExtendedCell> results) throws IOException { 098 return delegate.next(results); 099 } 100 101 @Override 102 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 103 throws IOException { 104 return delegate.next(result, scannerContext); 105 } 106 107 @Override 108 public boolean nextRaw(List<? super ExtendedCell> result) throws IOException { 109 return delegate.nextRaw(result); 110 } 111 112 @Override 113 public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context) 114 throws IOException { 115 return delegate.nextRaw(result, context); 116 } 117 118 @Override 119 public Set<Path> getFilesRead() { 120 return delegate.getFilesRead(); 121 } 122 123 @Override 124 public void close() throws IOException { 125 delegate.close(); 126 } 127 128 @Override 129 public RegionInfo getRegionInfo() { 130 return delegate.getRegionInfo(); 131 } 132 133 @Override 134 public boolean isFilterDone() throws IOException { 135 return delegate.isFilterDone(); 136 } 137 138 @Override 139 public boolean reseek(byte[] row) throws IOException { 140 return false; 141 } 142 143 @Override 144 public long getMaxResultSize() { 145 return delegate.getMaxResultSize(); 146 } 147 148 @Override 149 public long getMvccReadPoint() { 150 return delegate.getMvccReadPoint(); 151 } 152 153 @Override 154 public int getBatch() { 155 return delegate.getBatch(); 156 } 157 } 158 159 public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver { 160 161 private boolean startCalled; 162 private boolean stopCalled; 163 private boolean preOpenCalled; 164 private boolean postOpenCalled; 165 private boolean preCloseCalled; 166 private boolean postCloseCalled; 167 private boolean preCompactCalled; 168 private boolean postCompactCalled; 169 private boolean preFlushCalled; 170 private boolean postFlushCalled; 171 private ConcurrentMap<String, Object> sharedData; 172 173 @Override 174 public void start(CoprocessorEnvironment e) { 175 sharedData = ((RegionCoprocessorEnvironment) e).getSharedData(); 176 // using new String here, so that there will be new object on each invocation 177 sharedData.putIfAbsent("test1", new Object()); 178 startCalled = true; 179 } 180 181 @Override 182 public void stop(CoprocessorEnvironment e) { 183 sharedData = null; 184 stopCalled = true; 185 } 186 187 @Override 188 public Optional<RegionObserver> getRegionObserver() { 189 return Optional.of(this); 190 } 191 192 @Override 193 public void preOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e) { 194 preOpenCalled = true; 195 } 196 197 @Override 198 public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e) { 199 postOpenCalled = true; 200 } 201 202 @Override 203 public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> e, 204 boolean abortRequested) { 205 preCloseCalled = true; 206 } 207 208 @Override 209 public void postClose(ObserverContext<? extends RegionCoprocessorEnvironment> e, 210 boolean abortRequested) { 211 postCloseCalled = true; 212 } 213 214 @Override 215 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, 216 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 217 CompactionRequest request) { 218 preCompactCalled = true; 219 return scanner; 220 } 221 222 @Override 223 public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, Store store, 224 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) { 225 postCompactCalled = true; 226 } 227 228 @Override 229 public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e, 230 FlushLifeCycleTracker tracker) { 231 preFlushCalled = true; 232 } 233 234 @Override 235 public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e, 236 FlushLifeCycleTracker tracker) { 237 postFlushCalled = true; 238 } 239 240 @Override 241 public RegionScanner postScannerOpen( 242 final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan, 243 final RegionScanner s) throws IOException { 244 return new CustomScanner(s); 245 } 246 247 boolean wasStarted() { 248 return startCalled; 249 } 250 251 boolean wasStopped() { 252 return stopCalled; 253 } 254 255 boolean wasOpened() { 256 return (preOpenCalled && postOpenCalled); 257 } 258 259 boolean wasClosed() { 260 return (preCloseCalled && postCloseCalled); 261 } 262 263 boolean wasFlushed() { 264 return (preFlushCalled && postFlushCalled); 265 } 266 267 boolean wasCompacted() { 268 return (preCompactCalled && postCompactCalled); 269 } 270 271 Map<String, Object> getSharedData() { 272 return sharedData; 273 } 274 } 275 276 public static class CoprocessorII implements RegionCoprocessor { 277 private ConcurrentMap<String, Object> sharedData; 278 279 @Override 280 public void start(CoprocessorEnvironment e) { 281 sharedData = ((RegionCoprocessorEnvironment) e).getSharedData(); 282 sharedData.putIfAbsent("test2", new Object()); 283 } 284 285 @Override 286 public void stop(CoprocessorEnvironment e) { 287 sharedData = null; 288 } 289 290 @Override 291 public Optional<RegionObserver> getRegionObserver() { 292 return Optional.of(new RegionObserver() { 293 @Override 294 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 295 final Get get, final List<Cell> results) throws IOException { 296 throw new RuntimeException(); 297 } 298 }); 299 } 300 301 Map<String, Object> getSharedData() { 302 return sharedData; 303 } 304 } 305 306 @Test 307 public void testSharedData() throws IOException { 308 TableName tableName = TableName.valueOf(name.getMethodName()); 309 byte[][] families = { fam1, fam2, fam3 }; 310 311 Configuration hc = initConfig(); 312 HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[] {}, families); 313 314 for (int i = 0; i < 3; i++) { 315 HTestConst.addContent(region, fam3); 316 region.flush(true); 317 } 318 319 region.compact(false); 320 321 region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class); 322 323 Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 324 Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 325 Object o = ((CoprocessorImpl) c).getSharedData().get("test1"); 326 Object o2 = ((CoprocessorII) c2).getSharedData().get("test2"); 327 assertNotNull(o); 328 assertNotNull(o2); 329 // to coprocessors get different sharedDatas 330 assertFalse(((CoprocessorImpl) c).getSharedData() == ((CoprocessorII) c2).getSharedData()); 331 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 332 c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 333 // make sure that all coprocessor of a class have identical sharedDatas 334 assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o); 335 assertTrue(((CoprocessorII) c2).getSharedData().get("test2") == o2); 336 337 // now have all Environments fail 338 try { 339 byte[] r = region.getRegionInfo().getStartKey(); 340 if (r == null || r.length <= 0) { 341 // Its the start row. Can't ask for null. Ask for minimal key instead. 342 r = new byte[] { 0 }; 343 } 344 Get g = new Get(r); 345 region.get(g); 346 fail(); 347 } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) { 348 } 349 assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class)); 350 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 351 assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o); 352 c = c2 = null; 353 // perform a GC 354 System.gc(); 355 // reopen the region 356 region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class); 357 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 358 // CPimpl is unaffected, still the same reference 359 assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o); 360 c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 361 // new map and object created, hence the reference is different 362 // hence the old entry was indeed removed by the GC and new one has been created 363 Object o3 = ((CoprocessorII) c2).getSharedData().get("test2"); 364 assertFalse(o3 == o2); 365 HBaseTestingUtil.closeRegionAndWAL(region); 366 } 367 368 @Test 369 public void testCoprocessorInterface() throws IOException { 370 TableName tableName = TableName.valueOf(name.getMethodName()); 371 byte[][] families = { fam1, fam2, fam3 }; 372 373 Configuration hc = initConfig(); 374 HRegion region = initHRegion(tableName, name.getMethodName(), hc, 375 new Class<?>[] { CoprocessorImpl.class }, families); 376 for (int i = 0; i < 3; i++) { 377 HTestConst.addContent(region, fam3); 378 region.flush(true); 379 } 380 381 region.compact(false); 382 383 // HBASE-4197 384 Scan s = new Scan(); 385 RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s)); 386 assertTrue(scanner instanceof CustomScanner); 387 // this would throw an exception before HBASE-4197 388 scanner.next(new ArrayList<>()); 389 390 HBaseTestingUtil.closeRegionAndWAL(region); 391 Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 392 393 assertTrue("Coprocessor not started", ((CoprocessorImpl) c).wasStarted()); 394 assertTrue("Coprocessor not stopped", ((CoprocessorImpl) c).wasStopped()); 395 assertTrue(((CoprocessorImpl) c).wasOpened()); 396 assertTrue(((CoprocessorImpl) c).wasClosed()); 397 assertTrue(((CoprocessorImpl) c).wasFlushed()); 398 assertTrue(((CoprocessorImpl) c).wasCompacted()); 399 } 400 401 HRegion reopenRegion(final HRegion closedRegion, Class<?>... implClasses) throws IOException { 402 // RegionInfo info = new RegionInfo(tableName, null, null, false); 403 HRegion r = HRegion.openHRegion(closedRegion, null); 404 405 // this following piece is a hack. currently a coprocessorHost 406 // is secretly loaded at OpenRegionHandler. we don't really 407 // start a region server here, so just manually create cphost 408 // and set it to region. 409 Configuration conf = TEST_UTIL.getConfiguration(); 410 RegionCoprocessorHost host = 411 new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf); 412 r.setCoprocessorHost(host); 413 414 for (Class<?> implClass : implClasses) { 415 host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf); 416 } 417 // we need to manually call pre- and postOpen here since the 418 // above load() is not the real case for CP loading. A CP is 419 // expected to be loaded by default from 1) configuration; or 2) 420 // HTableDescriptor. If it's loaded after HRegion initialized, 421 // the pre- and postOpen() won't be triggered automatically. 422 // Here we have to call pre and postOpen explicitly. 423 host.preOpen(); 424 host.postOpen(); 425 return r; 426 } 427 428 HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 429 Class<?>[] implClasses, byte[][] families) throws IOException { 430 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 431 for (byte[] family : families) { 432 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 433 } 434 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 435 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 436 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).setStartKey(null).setEndKey(null) 437 .setSplit(false).build(); 438 Path path = new Path(DIR + callingMethod); 439 HRegion r = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build()); 440 441 // this following piece is a hack. 442 RegionCoprocessorHost host = 443 new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf); 444 r.setCoprocessorHost(host); 445 446 for (Class<?> implClass : implClasses) { 447 host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf); 448 Coprocessor c = host.findCoprocessor(implClass.getName()); 449 assertNotNull(c); 450 } 451 452 // Here we have to call pre and postOpen explicitly. 453 host.preOpen(); 454 host.postOpen(); 455 return r; 456 } 457 458 private Configuration initConfig() { 459 // Always compact if there is more than one store file. 460 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2); 461 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); 462 // Increase the amount of time between client retries 463 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000); 464 // This size should make it so we always split using the addContent 465 // below. After adding all data, the first region is 1.3M 466 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); 467 TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false); 468 469 return TEST_UTIL.getConfiguration(); 470 } 471}