001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.Cell.Type; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.coprocessor.ObserverContext; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 045import org.apache.hadoop.hbase.coprocessor.RegionObserver; 046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 047import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 048import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; 049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 050import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.junit.After; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Ignore; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064 065/** 066 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own 067 * code. 068 */ 069@Category({ CoprocessorTests.class, MediumTests.class }) 070public class TestCompactionLifeCycleTracker { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestCompactionLifeCycleTracker.class); 075 076 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 077 078 private static final TableName NAME = 079 TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); 080 081 private static final byte[] CF1 = Bytes.toBytes("CF1"); 082 083 private static final byte[] CF2 = Bytes.toBytes("CF2"); 084 085 private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); 086 087 private HRegion region; 088 089 private static CompactionLifeCycleTracker TRACKER = null; 090 091 // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. 092 public static final class CompactionObserver implements RegionObserver, RegionCoprocessor { 093 094 @Override 095 public Optional<RegionObserver> getRegionObserver() { 096 return Optional.of(this); 097 } 098 099 @Override 100 public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 101 List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException { 102 if (TRACKER != null) { 103 assertSame(tracker, TRACKER); 104 } 105 } 106 107 @Override 108 public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 109 List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker, 110 CompactionRequest request) { 111 if (TRACKER != null) { 112 assertSame(tracker, TRACKER); 113 } 114 } 115 116 @Override 117 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 118 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 119 CompactionRequest request) throws IOException { 120 if (TRACKER != null) { 121 assertSame(tracker, TRACKER); 122 } 123 return scanner; 124 } 125 126 @Override 127 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 128 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) 129 throws IOException { 130 if (TRACKER != null) { 131 assertSame(tracker, TRACKER); 132 } 133 } 134 } 135 136 @BeforeClass 137 public static void setUpBeforeClass() throws Exception { 138 UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); 139 UTIL.startMiniCluster(3); 140 } 141 142 @AfterClass 143 public static void tearDownAfterClass() throws Exception { 144 UTIL.shutdownMiniCluster(); 145 } 146 147 @Before 148 public void setUp() throws IOException { 149 UTIL.getAdmin() 150 .createTable(TableDescriptorBuilder.newBuilder(NAME) 151 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)) 153 .setCoprocessor(CompactionObserver.class.getName()).build()); 154 try (Table table = UTIL.getConnection().getTable(NAME)) { 155 for (int i = 0; i < 100; i++) { 156 byte[] row = Bytes.toBytes(i); 157 table 158 .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 159 .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 160 .setType(Cell.Type.Put).setValue(Bytes.toBytes(i)).build())); 161 } 162 UTIL.getAdmin().flush(NAME); 163 for (int i = 100; i < 200; i++) { 164 byte[] row = Bytes.toBytes(i); 165 table 166 .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 167 .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 168 .setType(Type.Put).setValue(Bytes.toBytes(i)).build())); 169 } 170 UTIL.getAdmin().flush(NAME); 171 } 172 region = UTIL.getHBaseCluster().getRegions(NAME).get(0); 173 assertEquals(2, region.getStore(CF1).getStorefilesCount()); 174 assertEquals(0, region.getStore(CF2).getStorefilesCount()); 175 } 176 177 @After 178 public void tearDown() throws IOException { 179 region = null; 180 TRACKER = null; 181 UTIL.deleteTable(NAME); 182 } 183 184 private static final class Tracker implements CompactionLifeCycleTracker { 185 186 final List<Pair<Store, String>> notExecutedStores = new ArrayList<>(); 187 188 final List<Store> beforeExecuteStores = new ArrayList<>(); 189 190 final List<Store> afterExecuteStores = new ArrayList<>(); 191 192 private boolean completed = false; 193 194 @Override 195 public void notExecuted(Store store, String reason) { 196 notExecutedStores.add(Pair.newPair(store, reason)); 197 } 198 199 @Override 200 public void beforeExecution(Store store) { 201 beforeExecuteStores.add(store); 202 } 203 204 @Override 205 public void afterExecution(Store store) { 206 afterExecuteStores.add(store); 207 } 208 209 @Override 210 public synchronized void completed() { 211 completed = true; 212 notifyAll(); 213 } 214 215 public synchronized void await() throws InterruptedException { 216 while (!completed) { 217 wait(); 218 } 219 } 220 } 221 222 @Test 223 public void testRequestOnRegion() throws IOException, InterruptedException { 224 Tracker tracker = new Tracker(); 225 TRACKER = tracker; 226 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 227 tracker.await(); 228 assertEquals(1, tracker.notExecutedStores.size()); 229 assertEquals(Bytes.toString(CF2), 230 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 231 assertThat(tracker.notExecutedStores.get(0).getSecond(), 232 containsString("compaction request was cancelled")); 233 234 assertEquals(1, tracker.beforeExecuteStores.size()); 235 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 236 237 assertEquals(1, tracker.afterExecuteStores.size()); 238 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 239 } 240 241 @Test 242 public void testRequestOnStore() throws IOException, InterruptedException { 243 Tracker tracker = new Tracker(); 244 TRACKER = tracker; 245 region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker); 246 tracker.await(); 247 assertTrue(tracker.notExecutedStores.isEmpty()); 248 assertEquals(1, tracker.beforeExecuteStores.size()); 249 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 250 assertEquals(1, tracker.afterExecuteStores.size()); 251 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 252 253 tracker = new Tracker(); 254 TRACKER = tracker; 255 region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker); 256 tracker.await(); 257 assertEquals(1, tracker.notExecutedStores.size()); 258 assertEquals(Bytes.toString(CF2), 259 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 260 assertThat(tracker.notExecutedStores.get(0).getSecond(), 261 containsString("compaction request was cancelled")); 262 assertTrue(tracker.beforeExecuteStores.isEmpty()); 263 assertTrue(tracker.afterExecuteStores.isEmpty()); 264 } 265 266 // This test assumes that compaction wouldn't happen with null user. 267 // But null user means system generated compaction so compaction should happen 268 // even if the space quota is violated. So this test should be removed/ignored. 269 @Ignore 270 @Test 271 public void testSpaceQuotaViolation() throws IOException, InterruptedException { 272 region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME, 273 new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L, 274 100L)); 275 Tracker tracker = new Tracker(); 276 TRACKER = tracker; 277 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 278 tracker.await(); 279 assertEquals(2, tracker.notExecutedStores.size()); 280 tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName() 281 .compareTo(p2.getFirst().getColumnFamilyName())); 282 283 assertEquals(Bytes.toString(CF2), 284 tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName()); 285 assertThat(tracker.notExecutedStores.get(1).getSecond(), 286 containsString("space quota violation")); 287 288 assertTrue(tracker.beforeExecuteStores.isEmpty()); 289 assertTrue(tracker.afterExecuteStores.isEmpty()); 290 } 291}