001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.CountDownLatch; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.Coprocessor; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.filter.FilterBase; 050import org.apache.hadoop.hbase.regionserver.ChunkCreator; 051import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 052import org.apache.hadoop.hbase.regionserver.HRegion; 053import org.apache.hadoop.hbase.regionserver.HRegionServer; 054import org.apache.hadoop.hbase.regionserver.HStore; 055import org.apache.hadoop.hbase.regionserver.InternalScanner; 056import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 057import org.apache.hadoop.hbase.regionserver.Region; 058import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 059import org.apache.hadoop.hbase.regionserver.RegionServerServices; 060import org.apache.hadoop.hbase.regionserver.ScanType; 061import org.apache.hadoop.hbase.regionserver.ScannerContext; 062import org.apache.hadoop.hbase.regionserver.Store; 063import org.apache.hadoop.hbase.regionserver.StoreScanner; 064import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 067import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 068import org.apache.hadoop.hbase.security.User; 069import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 070import org.apache.hadoop.hbase.testclassification.MediumTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.wal.WAL; 073import org.junit.jupiter.api.BeforeEach; 074import org.junit.jupiter.api.Tag; 075import org.junit.jupiter.api.Test; 076import org.junit.jupiter.api.TestInfo; 077 078@Tag(CoprocessorTests.TAG) 079@Tag(MediumTests.TAG) 080public class TestRegionObserverScannerOpenHook { 081 082 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 083 static final Path DIR = UTIL.getDataTestDir(); 084 085 private String currentTestName; 086 087 @BeforeEach 088 public void setUp(TestInfo testInfo) { 089 currentTestName = testInfo.getTestMethod().get().getName(); 090 } 091 092 public static class NoDataFilter extends FilterBase { 093 094 @Override 095 public ReturnCode filterCell(final Cell ignored) { 096 return ReturnCode.SKIP; 097 } 098 099 @Override 100 public boolean filterAllRemaining() throws IOException { 101 return true; 102 } 103 104 @Override 105 public boolean filterRow() throws IOException { 106 return true; 107 } 108 } 109 110 /** 111 * Do the default logic in {@link RegionObserver} interface. 112 */ 113 public static class EmptyRegionObsever implements RegionCoprocessor, RegionObserver { 114 @Override 115 public Optional<RegionObserver> getRegionObserver() { 116 return Optional.of(this); 117 } 118 } 119 120 /** 121 * Don't return any data from a scan by creating a custom {@link StoreScanner}. 122 */ 123 public static class NoDataFromScan implements RegionCoprocessor, RegionObserver { 124 @Override 125 public Optional<RegionObserver> getRegionObserver() { 126 return Optional.of(this); 127 } 128 129 @Override 130 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 131 List<Cell> result) throws IOException { 132 c.bypass(); 133 } 134 135 @Override 136 public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan) 137 throws IOException { 138 scan.setFilter(new NoDataFilter()); 139 } 140 } 141 142 private static final InternalScanner NO_DATA = new InternalScanner() { 143 144 @Override 145 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 146 throws IOException { 147 return false; 148 } 149 150 @Override 151 public void close() throws IOException { 152 } 153 }; 154 155 /** 156 * Don't allow any data in a flush by creating a custom {@link StoreScanner}. 157 */ 158 public static class NoDataFromFlush implements RegionCoprocessor, RegionObserver { 159 @Override 160 public Optional<RegionObserver> getRegionObserver() { 161 return Optional.of(this); 162 } 163 164 @Override 165 public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 166 Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 167 return NO_DATA; 168 } 169 } 170 171 /** 172 * Don't allow any data to be written out in the compaction by creating a custom 173 * {@link StoreScanner}. 174 */ 175 public static class NoDataFromCompaction implements RegionCoprocessor, RegionObserver { 176 @Override 177 public Optional<RegionObserver> getRegionObserver() { 178 return Optional.of(this); 179 } 180 181 @Override 182 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 183 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 184 CompactionRequest request) throws IOException { 185 return NO_DATA; 186 } 187 } 188 189 HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf, 190 byte[]... families) throws IOException { 191 TableDescriptorBuilder builder = 192 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 193 for (byte[] family : families) { 194 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 195 } 196 TableDescriptor tableDescriptor = builder.build(); 197 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 198 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 199 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); 200 Path path = new Path(DIR + callingMethod); 201 WAL wal = HBaseTestingUtil.createWal(conf, path, info); 202 HRegion r = HRegion.createHRegion(info, path, conf, tableDescriptor, wal); 203 // this following piece is a hack. currently a coprocessorHost 204 // is secretly loaded at OpenRegionHandler. we don't really 205 // start a region server here, so just manually create cphost 206 // and set it to region. 207 RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); 208 r.setCoprocessorHost(host); 209 return r; 210 } 211 212 @Test 213 public void testRegionObserverScanTimeStacking() throws Exception { 214 byte[] ROW = Bytes.toBytes("testRow"); 215 byte[] TABLE = Bytes.toBytes(getClass().getName()); 216 byte[] A = Bytes.toBytes("A"); 217 byte[][] FAMILIES = new byte[][] { A }; 218 219 // Use new HTU to not overlap with the DFS cluster started in #CompactionStacking 220 Configuration conf = new HBaseTestingUtil().getConfiguration(); 221 HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); 222 RegionCoprocessorHost h = region.getCoprocessorHost(); 223 h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf); 224 h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); 225 226 Put put = new Put(ROW); 227 put.addColumn(A, A, A); 228 region.put(put); 229 230 Get get = new Get(ROW); 231 Result r = region.get(get); 232 assertNull(r.listCells(), "Got an unexpected number of rows - " 233 + "no data should be returned with the NoDataFromScan coprocessor. Found: " + r); 234 HBaseTestingUtil.closeRegionAndWAL(region); 235 } 236 237 @Test 238 public void testRegionObserverFlushTimeStacking() throws Exception { 239 byte[] ROW = Bytes.toBytes("testRow"); 240 byte[] TABLE = Bytes.toBytes(getClass().getName()); 241 byte[] A = Bytes.toBytes("A"); 242 byte[][] FAMILIES = new byte[][] { A }; 243 244 // Use new HTU to not overlap with the DFS cluster started in #CompactionStacking 245 Configuration conf = new HBaseTestingUtil().getConfiguration(); 246 HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); 247 RegionCoprocessorHost h = region.getCoprocessorHost(); 248 h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf); 249 h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); 250 251 // put a row and flush it to disk 252 Put put = new Put(ROW); 253 put.addColumn(A, A, A); 254 region.put(put); 255 region.flush(true); 256 Get get = new Get(ROW); 257 Result r = region.get(get); 258 assertNull(r.listCells(), "Got an unexpected number of rows - " 259 + "no data should be returned with the NoDataFromScan coprocessor. Found: " + r); 260 HBaseTestingUtil.closeRegionAndWAL(region); 261 } 262 263 /* 264 * Custom HRegion which uses CountDownLatch to signal the completion of compaction 265 */ 266 public static class CompactionCompletionNotifyingRegion extends HRegion { 267 private static volatile CountDownLatch compactionStateChangeLatch = null; 268 269 @SuppressWarnings("deprecation") 270 public CompactionCompletionNotifyingRegion(Path tableDir, WAL log, FileSystem fs, 271 Configuration confParam, RegionInfo info, TableDescriptor htd, 272 RegionServerServices rsServices) { 273 super(tableDir, log, fs, confParam, info, htd, rsServices); 274 } 275 276 public CountDownLatch getCompactionStateChangeLatch() { 277 if (compactionStateChangeLatch == null) { 278 compactionStateChangeLatch = new CountDownLatch(1); 279 } 280 return compactionStateChangeLatch; 281 } 282 283 @Override 284 public boolean compact(CompactionContext compaction, HStore store, 285 ThroughputController throughputController) throws IOException { 286 boolean ret = super.compact(compaction, store, throughputController); 287 if (ret) { 288 compactionStateChangeLatch.countDown(); 289 } 290 return ret; 291 } 292 293 @Override 294 public boolean compact(CompactionContext compaction, HStore store, 295 ThroughputController throughputController, User user) throws IOException { 296 boolean ret = super.compact(compaction, store, throughputController, user); 297 if (ret) compactionStateChangeLatch.countDown(); 298 return ret; 299 } 300 } 301 302 /** 303 * Unfortunately, the easiest way to test this is to spin up a mini-cluster since we want to do 304 * the usual compaction mechanism on the region, rather than going through the backdoor to the 305 * region 306 */ 307 @Test 308 public void testRegionObserverCompactionTimeStacking() throws Exception { 309 // setup a mini cluster so we can do a real compaction on a region 310 Configuration conf = UTIL.getConfiguration(); 311 conf.setClass(HConstants.REGION_IMPL, CompactionCompletionNotifyingRegion.class, HRegion.class); 312 conf.setInt("hbase.hstore.compaction.min", 2); 313 UTIL.startMiniCluster(); 314 byte[] ROW = Bytes.toBytes("testRow"); 315 byte[] A = Bytes.toBytes("A"); 316 TableDescriptor tableDescriptor = 317 TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName)) 318 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(A)) 319 .setCoprocessor(CoprocessorDescriptorBuilder 320 .newBuilder(EmptyRegionObsever.class.getName()).setJarPath(null) 321 .setPriority(Coprocessor.PRIORITY_USER).setProperties(Collections.emptyMap()).build()) 322 .setCoprocessor(CoprocessorDescriptorBuilder 323 .newBuilder(NoDataFromCompaction.class.getName()).setJarPath(null) 324 .setPriority(Coprocessor.PRIORITY_HIGHEST).setProperties(Collections.emptyMap()).build()) 325 .build(); 326 327 Admin admin = UTIL.getAdmin(); 328 admin.createTable(tableDescriptor); 329 330 Table table = UTIL.getConnection().getTable(tableDescriptor.getTableName()); 331 332 // put a row and flush it to disk 333 Put put = new Put(ROW); 334 put.addColumn(A, A, A); 335 table.put(put); 336 337 HRegionServer rs = UTIL.getRSForFirstRegionInTable(tableDescriptor.getTableName()); 338 List<HRegion> regions = rs.getRegions(tableDescriptor.getTableName()); 339 assertEquals(1, regions.size(), "More than 1 region serving test table with 1 row"); 340 Region region = regions.get(0); 341 admin.flushRegion(region.getRegionInfo().getRegionName()); 342 CountDownLatch latch = 343 ((CompactionCompletionNotifyingRegion) region).getCompactionStateChangeLatch(); 344 345 // put another row and flush that too 346 put = new Put(Bytes.toBytes("anotherrow")); 347 put.addColumn(A, A, A); 348 table.put(put); 349 admin.flushRegion(region.getRegionInfo().getRegionName()); 350 351 // run a compaction, which normally would should get rid of the data 352 // wait for the compaction checker to complete 353 latch.await(); 354 // check both rows to ensure that they aren't there 355 Get get = new Get(ROW); 356 Result r = table.get(get); 357 assertNull(r.listCells(), "Got an unexpected number of rows - " 358 + "no data should be returned with the NoDataFromScan coprocessor. Found: " + r); 359 360 get = new Get(Bytes.toBytes("anotherrow")); 361 r = table.get(get); 362 assertNull(r.listCells(), "Got an unexpected number of rows - " 363 + "no data should be returned with the NoDataFromScan coprocessor Found: " + r); 364 365 table.close(); 366 UTIL.shutdownMiniCluster(); 367 } 368}