001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Map; 033import java.util.Optional; 034import java.util.concurrent.ConcurrentMap; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.Coprocessor; 039import org.apache.hadoop.hbase.CoprocessorEnvironment; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestCase; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HTableDescriptor; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Get; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.regionserver.ChunkCreator; 052import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 053import org.apache.hadoop.hbase.regionserver.HRegion; 054import org.apache.hadoop.hbase.regionserver.InternalScanner; 055import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; 056import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 057import org.apache.hadoop.hbase.regionserver.RegionScanner; 058import org.apache.hadoop.hbase.regionserver.RegionServerServices; 059import org.apache.hadoop.hbase.regionserver.ScanType; 060import org.apache.hadoop.hbase.regionserver.ScannerContext; 061import org.apache.hadoop.hbase.regionserver.Store; 062import org.apache.hadoop.hbase.regionserver.StoreFile; 063import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 064import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 065import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 066import org.apache.hadoop.hbase.testclassification.SmallTests; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.TestName; 072import org.mockito.Mockito; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076@Category({CoprocessorTests.class, SmallTests.class}) 077public class TestCoprocessorInterface { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestCoprocessorInterface.class); 082 083 @Rule public TestName name = new TestName(); 084 private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorInterface.class); 085 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 086 static final Path DIR = TEST_UTIL.getDataTestDir(); 087 088 private static class CustomScanner implements RegionScanner { 089 090 private RegionScanner delegate; 091 092 public CustomScanner(RegionScanner delegate) { 093 this.delegate = delegate; 094 } 095 096 @Override 097 public boolean next(List<Cell> results) throws IOException { 098 return delegate.next(results); 099 } 100 101 @Override 102 public boolean next(List<Cell> result, ScannerContext scannerContext) 103 throws IOException { 104 return delegate.next(result, scannerContext); 105 } 106 107 @Override 108 public boolean nextRaw(List<Cell> result) 109 throws IOException { 110 return delegate.nextRaw(result); 111 } 112 113 @Override 114 public boolean nextRaw(List<Cell> result, ScannerContext context) 115 throws IOException { 116 return delegate.nextRaw(result, context); 117 } 118 119 @Override 120 public void close() throws IOException { 121 delegate.close(); 122 } 123 124 @Override 125 public RegionInfo getRegionInfo() { 126 return delegate.getRegionInfo(); 127 } 128 129 @Override 130 public boolean isFilterDone() throws IOException { 131 return delegate.isFilterDone(); 132 } 133 134 @Override 135 public boolean reseek(byte[] row) throws IOException { 136 return false; 137 } 138 139 @Override 140 public long getMaxResultSize() { 141 return delegate.getMaxResultSize(); 142 } 143 144 @Override 145 public long getMvccReadPoint() { 146 return delegate.getMvccReadPoint(); 147 } 148 149 @Override 150 public int getBatch() { 151 return delegate.getBatch(); 152 } 153 } 154 155 public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver { 156 157 private boolean startCalled; 158 private boolean stopCalled; 159 private boolean preOpenCalled; 160 private boolean postOpenCalled; 161 private boolean preCloseCalled; 162 private boolean postCloseCalled; 163 private boolean preCompactCalled; 164 private boolean postCompactCalled; 165 private boolean preFlushCalled; 166 private boolean postFlushCalled; 167 private ConcurrentMap<String, Object> sharedData; 168 169 @Override 170 public void start(CoprocessorEnvironment e) { 171 sharedData = ((RegionCoprocessorEnvironment)e).getSharedData(); 172 // using new String here, so that there will be new object on each invocation 173 sharedData.putIfAbsent("test1", new Object()); 174 startCalled = true; 175 } 176 177 @Override 178 public void stop(CoprocessorEnvironment e) { 179 sharedData = null; 180 stopCalled = true; 181 } 182 183 @Override 184 public Optional<RegionObserver> getRegionObserver() { 185 return Optional.of(this); 186 } 187 188 @Override 189 public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) { 190 preOpenCalled = true; 191 } 192 @Override 193 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { 194 postOpenCalled = true; 195 } 196 @Override 197 public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) { 198 preCloseCalled = true; 199 } 200 @Override 201 public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) { 202 postCloseCalled = true; 203 } 204 @Override 205 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, 206 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 207 CompactionRequest request) { 208 preCompactCalled = true; 209 return scanner; 210 } 211 @Override 212 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, 213 Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker, 214 CompactionRequest request) { 215 postCompactCalled = true; 216 } 217 218 @Override 219 public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e, 220 FlushLifeCycleTracker tracker) { 221 preFlushCalled = true; 222 } 223 224 @Override 225 public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, 226 FlushLifeCycleTracker tracker) { 227 postFlushCalled = true; 228 } 229 230 @Override 231 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 232 final Scan scan, final RegionScanner s) throws IOException { 233 return new CustomScanner(s); 234 } 235 236 boolean wasStarted() { 237 return startCalled; 238 } 239 boolean wasStopped() { 240 return stopCalled; 241 } 242 boolean wasOpened() { 243 return (preOpenCalled && postOpenCalled); 244 } 245 boolean wasClosed() { 246 return (preCloseCalled && postCloseCalled); 247 } 248 boolean wasFlushed() { 249 return (preFlushCalled && postFlushCalled); 250 } 251 boolean wasCompacted() { 252 return (preCompactCalled && postCompactCalled); 253 } 254 Map<String, Object> getSharedData() { 255 return sharedData; 256 } 257 } 258 259 public static class CoprocessorII implements RegionCoprocessor { 260 private ConcurrentMap<String, Object> sharedData; 261 262 @Override 263 public void start(CoprocessorEnvironment e) { 264 sharedData = ((RegionCoprocessorEnvironment)e).getSharedData(); 265 sharedData.putIfAbsent("test2", new Object()); 266 } 267 268 @Override 269 public void stop(CoprocessorEnvironment e) { 270 sharedData = null; 271 } 272 273 @Override 274 public Optional<RegionObserver> getRegionObserver() { 275 return Optional.of(new RegionObserver() { 276 @Override 277 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 278 final Get get, final List<Cell> results) throws IOException { 279 throw new RuntimeException(); 280 } 281 }); 282 } 283 284 Map<String, Object> getSharedData() { 285 return sharedData; 286 } 287 } 288 289 @Test 290 public void testSharedData() throws IOException { 291 TableName tableName = TableName.valueOf(name.getMethodName()); 292 byte [][] families = { fam1, fam2, fam3 }; 293 294 Configuration hc = initConfig(); 295 HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[]{}, families); 296 297 for (int i = 0; i < 3; i++) { 298 HBaseTestCase.addContent(region, fam3); 299 region.flush(true); 300 } 301 302 region.compact(false); 303 304 region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class); 305 306 Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 307 Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 308 Object o = ((CoprocessorImpl)c).getSharedData().get("test1"); 309 Object o2 = ((CoprocessorII)c2).getSharedData().get("test2"); 310 assertNotNull(o); 311 assertNotNull(o2); 312 // to coprocessors get different sharedDatas 313 assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData()); 314 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 315 c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 316 // make sure that all coprocessor of a class have identical sharedDatas 317 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); 318 assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2); 319 320 // now have all Environments fail 321 try { 322 byte [] r = region.getRegionInfo().getStartKey(); 323 if (r == null || r.length <= 0) { 324 // Its the start row. Can't ask for null. Ask for minimal key instead. 325 r = new byte [] {0}; 326 } 327 Get g = new Get(r); 328 region.get(g); 329 fail(); 330 } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) { 331 } 332 assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class)); 333 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 334 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); 335 c = c2 = null; 336 // perform a GC 337 System.gc(); 338 // reopen the region 339 region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class); 340 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 341 // CPimpl is unaffected, still the same reference 342 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); 343 c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 344 // new map and object created, hence the reference is different 345 // hence the old entry was indeed removed by the GC and new one has been created 346 Object o3 = ((CoprocessorII)c2).getSharedData().get("test2"); 347 assertFalse(o3 == o2); 348 HBaseTestingUtility.closeRegionAndWAL(region); 349 } 350 351 @Test 352 public void testCoprocessorInterface() throws IOException { 353 TableName tableName = TableName.valueOf(name.getMethodName()); 354 byte [][] families = { fam1, fam2, fam3 }; 355 356 Configuration hc = initConfig(); 357 HRegion region = initHRegion(tableName, name.getMethodName(), hc, 358 new Class<?>[]{CoprocessorImpl.class}, families); 359 for (int i = 0; i < 3; i++) { 360 HBaseTestCase.addContent(region, fam3); 361 region.flush(true); 362 } 363 364 region.compact(false); 365 366 // HBASE-4197 367 Scan s = new Scan(); 368 RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s)); 369 assertTrue(scanner instanceof CustomScanner); 370 // this would throw an exception before HBASE-4197 371 scanner.next(new ArrayList<>()); 372 373 HBaseTestingUtility.closeRegionAndWAL(region); 374 Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 375 376 assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted()); 377 assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped()); 378 assertTrue(((CoprocessorImpl)c).wasOpened()); 379 assertTrue(((CoprocessorImpl)c).wasClosed()); 380 assertTrue(((CoprocessorImpl)c).wasFlushed()); 381 assertTrue(((CoprocessorImpl)c).wasCompacted()); 382 } 383 384 HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses) 385 throws IOException { 386 //RegionInfo info = new RegionInfo(tableName, null, null, false); 387 HRegion r = HRegion.openHRegion(closedRegion, null); 388 389 // this following piece is a hack. currently a coprocessorHost 390 // is secretly loaded at OpenRegionHandler. we don't really 391 // start a region server here, so just manually create cphost 392 // and set it to region. 393 Configuration conf = TEST_UTIL.getConfiguration(); 394 RegionCoprocessorHost host = new RegionCoprocessorHost(r, 395 Mockito.mock(RegionServerServices.class), conf); 396 r.setCoprocessorHost(host); 397 398 for (Class<?> implClass : implClasses) { 399 host.load((Class<? extends RegionCoprocessor>) implClass, Coprocessor.PRIORITY_USER, conf); 400 } 401 // we need to manually call pre- and postOpen here since the 402 // above load() is not the real case for CP loading. A CP is 403 // expected to be loaded by default from 1) configuration; or 2) 404 // HTableDescriptor. If it's loaded after HRegion initialized, 405 // the pre- and postOpen() won't be triggered automatically. 406 // Here we have to call pre and postOpen explicitly. 407 host.preOpen(); 408 host.postOpen(); 409 return r; 410 } 411 412 HRegion initHRegion (TableName tableName, String callingMethod, 413 Configuration conf, Class<?> [] implClasses, byte [][] families) 414 throws IOException { 415 HTableDescriptor htd = new HTableDescriptor(tableName); 416 for(byte [] family : families) { 417 htd.addFamily(new HColumnDescriptor(family)); 418 } 419 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 420 RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 421 .setStartKey(null) 422 .setEndKey(null) 423 .setSplit(false) 424 .build(); 425 Path path = new Path(DIR + callingMethod); 426 HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); 427 428 // this following piece is a hack. 429 RegionCoprocessorHost host = 430 new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf); 431 r.setCoprocessorHost(host); 432 433 for (Class<?> implClass : implClasses) { 434 host.load((Class<? extends RegionCoprocessor>) implClass, Coprocessor.PRIORITY_USER, conf); 435 Coprocessor c = host.findCoprocessor(implClass.getName()); 436 assertNotNull(c); 437 } 438 439 // Here we have to call pre and postOpen explicitly. 440 host.preOpen(); 441 host.postOpen(); 442 return r; 443 } 444 445 private Configuration initConfig() { 446 // Always compact if there is more than one store file. 447 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2); 448 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); 449 // Increase the amount of time between client retries 450 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000); 451 // This size should make it so we always split using the addContent 452 // below. After adding all data, the first region is 1.3M 453 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 454 1024 * 128); 455 TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", 456 true); 457 TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false); 458 459 return TEST_UTIL.getConfiguration(); 460 } 461}