001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertSame; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.Cell.Type; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 047import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; 048import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 049import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 050import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 051import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.Pair; 055import org.junit.jupiter.api.AfterAll; 056import org.junit.jupiter.api.AfterEach; 057import org.junit.jupiter.api.BeforeAll; 058import org.junit.jupiter.api.BeforeEach; 059import org.junit.jupiter.api.Disabled; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062 063/** 064 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own 065 * code. 066 */ 067@Tag(CoprocessorTests.TAG) 068@Tag(MediumTests.TAG) 069public class TestCompactionLifeCycleTracker { 070 071 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 072 073 private static final TableName NAME = 074 TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); 075 076 private static final byte[] CF1 = Bytes.toBytes("CF1"); 077 078 private static final byte[] CF2 = Bytes.toBytes("CF2"); 079 080 private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); 081 082 private HRegion region; 083 084 private static CompactionLifeCycleTracker TRACKER = null; 085 086 // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. 087 public static final class CompactionObserver implements RegionObserver, RegionCoprocessor { 088 089 @Override 090 public Optional<RegionObserver> getRegionObserver() { 091 return Optional.of(this); 092 } 093 094 @Override 095 public void preCompactSelection(ObserverContext<? extends RegionCoprocessorEnvironment> c, 096 Store store, List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) 097 throws IOException { 098 if (TRACKER != null) { 099 assertSame(tracker, TRACKER); 100 } 101 } 102 103 @Override 104 public void postCompactSelection(ObserverContext<? extends RegionCoprocessorEnvironment> c, 105 Store store, List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker, 106 CompactionRequest request) { 107 if (TRACKER != null) { 108 assertSame(tracker, TRACKER); 109 } 110 } 111 112 @Override 113 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 114 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 115 CompactionRequest request) throws IOException { 116 if (TRACKER != null) { 117 assertSame(tracker, TRACKER); 118 } 119 return scanner; 120 } 121 122 @Override 123 public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, 124 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) 125 throws IOException { 126 if (TRACKER != null) { 127 assertSame(tracker, TRACKER); 128 } 129 } 130 } 131 132 @BeforeAll 133 public static void setUpBeforeClass() throws Exception { 134 UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); 135 UTIL.startMiniCluster(3); 136 } 137 138 @AfterAll 139 public static void tearDownAfterClass() throws Exception { 140 UTIL.shutdownMiniCluster(); 141 } 142 143 @BeforeEach 144 public void setUp() throws IOException { 145 UTIL.getAdmin() 146 .createTable(TableDescriptorBuilder.newBuilder(NAME) 147 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)) 148 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)) 149 .setCoprocessor(CompactionObserver.class.getName()).build()); 150 try (Table table = UTIL.getConnection().getTable(NAME)) { 151 for (int i = 0; i < 100; i++) { 152 byte[] row = Bytes.toBytes(i); 153 table 154 .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 155 .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 156 .setType(Cell.Type.Put).setValue(Bytes.toBytes(i)).build())); 157 } 158 UTIL.getAdmin().flush(NAME); 159 for (int i = 100; i < 200; i++) { 160 byte[] row = Bytes.toBytes(i); 161 table 162 .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 163 .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 164 .setType(Type.Put).setValue(Bytes.toBytes(i)).build())); 165 } 166 UTIL.getAdmin().flush(NAME); 167 } 168 region = UTIL.getHBaseCluster().getRegions(NAME).get(0); 169 assertEquals(2, region.getStore(CF1).getStorefilesCount()); 170 assertEquals(0, region.getStore(CF2).getStorefilesCount()); 171 } 172 173 @AfterEach 174 public void tearDown() throws IOException { 175 region = null; 176 TRACKER = null; 177 UTIL.deleteTable(NAME); 178 } 179 180 private static final class Tracker implements CompactionLifeCycleTracker { 181 182 final List<Pair<Store, String>> notExecutedStores = new ArrayList<>(); 183 184 final List<Store> beforeExecuteStores = new ArrayList<>(); 185 186 final List<Store> afterExecuteStores = new ArrayList<>(); 187 188 private boolean completed = false; 189 190 @Override 191 public void notExecuted(Store store, String reason) { 192 notExecutedStores.add(Pair.newPair(store, reason)); 193 } 194 195 @Override 196 public void beforeExecution(Store store) { 197 beforeExecuteStores.add(store); 198 } 199 200 @Override 201 public void afterExecution(Store store) { 202 afterExecuteStores.add(store); 203 } 204 205 @Override 206 public synchronized void completed() { 207 completed = true; 208 notifyAll(); 209 } 210 211 public synchronized void await() throws InterruptedException { 212 while (!completed) { 213 wait(); 214 } 215 } 216 } 217 218 @Test 219 public void testRequestOnRegion() throws IOException, InterruptedException { 220 Tracker tracker = new Tracker(); 221 TRACKER = tracker; 222 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 223 tracker.await(); 224 assertEquals(1, tracker.notExecutedStores.size()); 225 assertEquals(Bytes.toString(CF2), 226 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 227 assertThat(tracker.notExecutedStores.get(0).getSecond(), 228 containsString("compaction request was cancelled")); 229 230 assertEquals(1, tracker.beforeExecuteStores.size()); 231 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 232 233 assertEquals(1, tracker.afterExecuteStores.size()); 234 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 235 } 236 237 @Test 238 public void testRequestOnStore() throws IOException, InterruptedException { 239 Tracker tracker = new Tracker(); 240 TRACKER = tracker; 241 region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker); 242 tracker.await(); 243 assertTrue(tracker.notExecutedStores.isEmpty()); 244 assertEquals(1, tracker.beforeExecuteStores.size()); 245 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 246 assertEquals(1, tracker.afterExecuteStores.size()); 247 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 248 249 tracker = new Tracker(); 250 TRACKER = tracker; 251 region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker); 252 tracker.await(); 253 assertEquals(1, tracker.notExecutedStores.size()); 254 assertEquals(Bytes.toString(CF2), 255 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 256 assertThat(tracker.notExecutedStores.get(0).getSecond(), 257 containsString("compaction request was cancelled")); 258 assertTrue(tracker.beforeExecuteStores.isEmpty()); 259 assertTrue(tracker.afterExecuteStores.isEmpty()); 260 } 261 262 // This test assumes that compaction wouldn't happen with null user. 263 // But null user means system generated compaction so compaction should happen 264 // even if the space quota is violated. So this test should be removed/ignored. 265 @Disabled 266 @Test 267 public void testSpaceQuotaViolation() throws IOException, InterruptedException { 268 region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME, 269 new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L, 270 100L)); 271 Tracker tracker = new Tracker(); 272 TRACKER = tracker; 273 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 274 tracker.await(); 275 assertEquals(2, tracker.notExecutedStores.size()); 276 tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName() 277 .compareTo(p2.getFirst().getColumnFamilyName())); 278 279 assertEquals(Bytes.toString(CF2), 280 tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName()); 281 assertThat(tracker.notExecutedStores.get(1).getSecond(), 282 containsString("space quota violation")); 283 284 assertTrue(tracker.beforeExecuteStores.isEmpty()); 285 assertTrue(tracker.afterExecuteStores.isEmpty()); 286 } 287}