001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Map; 033import java.util.Optional; 034import java.util.concurrent.ConcurrentMap; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.Coprocessor; 039import org.apache.hadoop.hbase.CoprocessorEnvironment; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestCase; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HTableDescriptor; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Get; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.regionserver.ChunkCreator; 052import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 053import org.apache.hadoop.hbase.regionserver.HRegion; 054import org.apache.hadoop.hbase.regionserver.InternalScanner; 055import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 056import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 057import org.apache.hadoop.hbase.regionserver.RegionScanner; 058import org.apache.hadoop.hbase.regionserver.RegionServerServices; 059import org.apache.hadoop.hbase.regionserver.ScanType; 060import org.apache.hadoop.hbase.regionserver.ScannerContext; 061import org.apache.hadoop.hbase.regionserver.Store; 062import org.apache.hadoop.hbase.regionserver.StoreFile; 063import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 064import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 065import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 066import org.apache.hadoop.hbase.testclassification.MediumTests; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.TestName; 072import org.mockito.Mockito; 073 074@Category({CoprocessorTests.class, MediumTests.class}) 075public class TestCoprocessorInterface { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestCoprocessorInterface.class); 080 081 @Rule public TestName name = new TestName(); 082 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 static final Path DIR = TEST_UTIL.getDataTestDir(); 084 085 private static class CustomScanner implements RegionScanner { 086 087 private RegionScanner delegate; 088 089 public CustomScanner(RegionScanner delegate) { 090 this.delegate = delegate; 091 } 092 093 @Override 094 public boolean next(List<Cell> results) throws IOException { 095 return delegate.next(results); 096 } 097 098 @Override 099 public boolean next(List<Cell> result, ScannerContext scannerContext) 100 throws IOException { 101 return delegate.next(result, scannerContext); 102 } 103 104 @Override 105 public boolean nextRaw(List<Cell> result) 106 throws IOException { 107 return delegate.nextRaw(result); 108 } 109 110 @Override 111 public boolean nextRaw(List<Cell> result, ScannerContext context) 112 throws IOException { 113 return delegate.nextRaw(result, context); 114 } 115 116 @Override 117 public void close() throws IOException { 118 delegate.close(); 119 } 120 121 @Override 122 public RegionInfo getRegionInfo() { 123 return delegate.getRegionInfo(); 124 } 125 126 @Override 127 public boolean isFilterDone() throws IOException { 128 return delegate.isFilterDone(); 129 } 130 131 @Override 132 public boolean reseek(byte[] row) throws IOException { 133 return false; 134 } 135 136 @Override 137 public long getMaxResultSize() { 138 return delegate.getMaxResultSize(); 139 } 140 141 @Override 142 public long getMvccReadPoint() { 143 return delegate.getMvccReadPoint(); 144 } 145 146 @Override 147 public int getBatch() { 148 return delegate.getBatch(); 149 } 150 } 151 152 public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver { 153 154 private boolean startCalled; 155 private boolean stopCalled; 156 private boolean preOpenCalled; 157 private boolean postOpenCalled; 158 private boolean preCloseCalled; 159 private boolean postCloseCalled; 160 private boolean preCompactCalled; 161 private boolean postCompactCalled; 162 private boolean preFlushCalled; 163 private boolean postFlushCalled; 164 private ConcurrentMap<String, Object> sharedData; 165 166 @Override 167 public void start(CoprocessorEnvironment e) { 168 sharedData = ((RegionCoprocessorEnvironment)e).getSharedData(); 169 // using new String here, so that there will be new object on each invocation 170 sharedData.putIfAbsent("test1", new Object()); 171 startCalled = true; 172 } 173 174 @Override 175 public void stop(CoprocessorEnvironment e) { 176 sharedData = null; 177 stopCalled = true; 178 } 179 180 @Override 181 public Optional<RegionObserver> getRegionObserver() { 182 return Optional.of(this); 183 } 184 185 @Override 186 public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) { 187 preOpenCalled = true; 188 } 189 @Override 190 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { 191 postOpenCalled = true; 192 } 193 @Override 194 public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) { 195 preCloseCalled = true; 196 } 197 @Override 198 public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) { 199 postCloseCalled = true; 200 } 201 @Override 202 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, 203 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 204 CompactionRequest request) { 205 preCompactCalled = true; 206 return scanner; 207 } 208 @Override 209 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, 210 Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker, 211 CompactionRequest request) { 212 postCompactCalled = true; 213 } 214 215 @Override 216 public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e, 217 FlushLifeCycleTracker tracker) { 218 preFlushCalled = true; 219 } 220 221 @Override 222 public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, 223 FlushLifeCycleTracker tracker) { 224 postFlushCalled = true; 225 } 226 227 @Override 228 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 229 final Scan scan, final RegionScanner s) throws IOException { 230 return new CustomScanner(s); 231 } 232 233 boolean wasStarted() { 234 return startCalled; 235 } 236 boolean wasStopped() { 237 return stopCalled; 238 } 239 boolean wasOpened() { 240 return (preOpenCalled && postOpenCalled); 241 } 242 boolean wasClosed() { 243 return (preCloseCalled && postCloseCalled); 244 } 245 boolean wasFlushed() { 246 return (preFlushCalled && postFlushCalled); 247 } 248 boolean wasCompacted() { 249 return (preCompactCalled && postCompactCalled); 250 } 251 Map<String, Object> getSharedData() { 252 return sharedData; 253 } 254 } 255 256 public static class CoprocessorII implements RegionCoprocessor { 257 private ConcurrentMap<String, Object> sharedData; 258 259 @Override 260 public void start(CoprocessorEnvironment e) { 261 sharedData = ((RegionCoprocessorEnvironment)e).getSharedData(); 262 sharedData.putIfAbsent("test2", new Object()); 263 } 264 265 @Override 266 public void stop(CoprocessorEnvironment e) { 267 sharedData = null; 268 } 269 270 @Override 271 public Optional<RegionObserver> getRegionObserver() { 272 return Optional.of(new RegionObserver() { 273 @Override 274 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 275 final Get get, final List<Cell> results) throws IOException { 276 throw new RuntimeException(); 277 } 278 }); 279 } 280 281 Map<String, Object> getSharedData() { 282 return sharedData; 283 } 284 } 285 286 @Test 287 public void testSharedData() throws IOException { 288 TableName tableName = TableName.valueOf(name.getMethodName()); 289 byte [][] families = { fam1, fam2, fam3 }; 290 291 Configuration hc = initConfig(); 292 HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[]{}, families); 293 294 for (int i = 0; i < 3; i++) { 295 HBaseTestCase.addContent(region, fam3); 296 region.flush(true); 297 } 298 299 region.compact(false); 300 301 region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class); 302 303 Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 304 Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 305 Object o = ((CoprocessorImpl)c).getSharedData().get("test1"); 306 Object o2 = ((CoprocessorII)c2).getSharedData().get("test2"); 307 assertNotNull(o); 308 assertNotNull(o2); 309 // to coprocessors get different sharedDatas 310 assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData()); 311 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 312 c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 313 // make sure that all coprocessor of a class have identical sharedDatas 314 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); 315 assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2); 316 317 // now have all Environments fail 318 try { 319 byte [] r = region.getRegionInfo().getStartKey(); 320 if (r == null || r.length <= 0) { 321 // Its the start row. Can't ask for null. Ask for minimal key instead. 322 r = new byte [] {0}; 323 } 324 Get g = new Get(r); 325 region.get(g); 326 fail(); 327 } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) { 328 } 329 assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class)); 330 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 331 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); 332 c = c2 = null; 333 // perform a GC 334 System.gc(); 335 // reopen the region 336 region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class); 337 c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 338 // CPimpl is unaffected, still the same reference 339 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o); 340 c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class); 341 // new map and object created, hence the reference is different 342 // hence the old entry was indeed removed by the GC and new one has been created 343 Object o3 = ((CoprocessorII)c2).getSharedData().get("test2"); 344 assertFalse(o3 == o2); 345 HBaseTestingUtility.closeRegionAndWAL(region); 346 } 347 348 @Test 349 public void testCoprocessorInterface() throws IOException { 350 TableName tableName = TableName.valueOf(name.getMethodName()); 351 byte [][] families = { fam1, fam2, fam3 }; 352 353 Configuration hc = initConfig(); 354 HRegion region = initHRegion(tableName, name.getMethodName(), hc, 355 new Class<?>[]{CoprocessorImpl.class}, families); 356 for (int i = 0; i < 3; i++) { 357 HBaseTestCase.addContent(region, fam3); 358 region.flush(true); 359 } 360 361 region.compact(false); 362 363 // HBASE-4197 364 Scan s = new Scan(); 365 RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s)); 366 assertTrue(scanner instanceof CustomScanner); 367 // this would throw an exception before HBASE-4197 368 scanner.next(new ArrayList<>()); 369 370 HBaseTestingUtility.closeRegionAndWAL(region); 371 Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class); 372 373 assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted()); 374 assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped()); 375 assertTrue(((CoprocessorImpl)c).wasOpened()); 376 assertTrue(((CoprocessorImpl)c).wasClosed()); 377 assertTrue(((CoprocessorImpl)c).wasFlushed()); 378 assertTrue(((CoprocessorImpl)c).wasCompacted()); 379 } 380 381 HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses) 382 throws IOException { 383 //RegionInfo info = new RegionInfo(tableName, null, null, false); 384 HRegion r = HRegion.openHRegion(closedRegion, null); 385 386 // this following piece is a hack. currently a coprocessorHost 387 // is secretly loaded at OpenRegionHandler. we don't really 388 // start a region server here, so just manually create cphost 389 // and set it to region. 390 Configuration conf = TEST_UTIL.getConfiguration(); 391 RegionCoprocessorHost host = new RegionCoprocessorHost(r, 392 Mockito.mock(RegionServerServices.class), conf); 393 r.setCoprocessorHost(host); 394 395 for (Class<?> implClass : implClasses) { 396 host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf); 397 } 398 // we need to manually call pre- and postOpen here since the 399 // above load() is not the real case for CP loading. A CP is 400 // expected to be loaded by default from 1) configuration; or 2) 401 // HTableDescriptor. If it's loaded after HRegion initialized, 402 // the pre- and postOpen() won't be triggered automatically. 403 // Here we have to call pre and postOpen explicitly. 404 host.preOpen(); 405 host.postOpen(); 406 return r; 407 } 408 409 HRegion initHRegion (TableName tableName, String callingMethod, 410 Configuration conf, Class<?> [] implClasses, byte [][] families) 411 throws IOException { 412 HTableDescriptor htd = new HTableDescriptor(tableName); 413 for(byte [] family : families) { 414 htd.addFamily(new HColumnDescriptor(family)); 415 } 416 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 417 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 418 RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 419 .setStartKey(null) 420 .setEndKey(null) 421 .setSplit(false) 422 .build(); 423 Path path = new Path(DIR + callingMethod); 424 HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); 425 426 // this following piece is a hack. 427 RegionCoprocessorHost host = 428 new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf); 429 r.setCoprocessorHost(host); 430 431 for (Class<?> implClass : implClasses) { 432 host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf); 433 Coprocessor c = host.findCoprocessor(implClass.getName()); 434 assertNotNull(c); 435 } 436 437 // Here we have to call pre and postOpen explicitly. 438 host.preOpen(); 439 host.postOpen(); 440 return r; 441 } 442 443 private Configuration initConfig() { 444 // Always compact if there is more than one store file. 445 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2); 446 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); 447 // Increase the amount of time between client retries 448 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000); 449 // This size should make it so we always split using the addContent 450 // below. After adding all data, the first region is 1.3M 451 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 452 1024 * 128); 453 TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false); 454 455 return TEST_UTIL.getConfiguration(); 456 } 457}