001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertSame; 023import static org.junit.Assert.assertThat; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.Cell.Type; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.coprocessor.ObserverContext; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 045import org.apache.hadoop.hbase.coprocessor.RegionObserver; 046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 047import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 048import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; 049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 050import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.junit.After; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Ignore; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064 065/** 066 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own 067 * code. 068 */ 069@Category({ CoprocessorTests.class, MediumTests.class }) 070public class TestCompactionLifeCycleTracker { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestCompactionLifeCycleTracker.class); 075 076 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 077 078 private static final TableName NAME = 079 TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); 080 081 private static final byte[] CF1 = Bytes.toBytes("CF1"); 082 083 private static final byte[] CF2 = Bytes.toBytes("CF2"); 084 085 private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); 086 087 private HRegion region; 088 089 private static CompactionLifeCycleTracker TRACKER = null; 090 091 // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. 092 public static final class CompactionObserver implements RegionObserver, RegionCoprocessor { 093 094 @Override 095 public Optional<RegionObserver> getRegionObserver() { 096 return Optional.of(this); 097 } 098 099 @Override 100 public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 101 List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) 102 throws IOException { 103 if (TRACKER != null) { 104 assertSame(tracker, TRACKER); 105 } 106 } 107 108 @Override 109 public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 110 List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker, 111 CompactionRequest request) { 112 if (TRACKER != null) { 113 assertSame(tracker, TRACKER); 114 } 115 } 116 117 @Override 118 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 119 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 120 CompactionRequest request) throws IOException { 121 if (TRACKER != null) { 122 assertSame(tracker, TRACKER); 123 } 124 return scanner; 125 } 126 127 @Override 128 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 129 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) 130 throws IOException { 131 if (TRACKER != null) { 132 assertSame(tracker, TRACKER); 133 } 134 } 135 } 136 137 @BeforeClass 138 public static void setUpBeforeClass() throws Exception { 139 UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); 140 UTIL.startMiniCluster(3); 141 } 142 143 @AfterClass 144 public static void tearDownAfterClass() throws Exception { 145 UTIL.shutdownMiniCluster(); 146 } 147 148 @Before 149 public void setUp() throws IOException { 150 UTIL.getAdmin() 151 .createTable(TableDescriptorBuilder.newBuilder(NAME) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)) 153 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)) 154 .setCoprocessor(CompactionObserver.class.getName()).build()); 155 try (Table table = UTIL.getConnection().getTable(NAME)) { 156 for (int i = 0; i < 100; i++) { 157 byte[] row = Bytes.toBytes(i); 158 table.put(new Put(row) 159 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 160 .setRow(row) 161 .setFamily(CF1) 162 .setQualifier(QUALIFIER) 163 .setTimestamp(HConstants.LATEST_TIMESTAMP) 164 .setType(Cell.Type.Put) 165 .setValue(Bytes.toBytes(i)) 166 .build())); 167 } 168 UTIL.getAdmin().flush(NAME); 169 for (int i = 100; i < 200; i++) { 170 byte[] row = Bytes.toBytes(i); 171 table.put(new Put(row) 172 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 173 .setRow(row) 174 .setFamily(CF1) 175 .setQualifier(QUALIFIER) 176 .setTimestamp(HConstants.LATEST_TIMESTAMP) 177 .setType(Type.Put) 178 .setValue(Bytes.toBytes(i)) 179 .build())); 180 } 181 UTIL.getAdmin().flush(NAME); 182 } 183 region = UTIL.getHBaseCluster().getRegions(NAME).get(0); 184 assertEquals(2, region.getStore(CF1).getStorefilesCount()); 185 assertEquals(0, region.getStore(CF2).getStorefilesCount()); 186 } 187 188 @After 189 public void tearDown() throws IOException { 190 region = null; 191 TRACKER = null; 192 UTIL.deleteTable(NAME); 193 } 194 195 private static final class Tracker implements CompactionLifeCycleTracker { 196 197 final List<Pair<Store, String>> notExecutedStores = new ArrayList<>(); 198 199 final List<Store> beforeExecuteStores = new ArrayList<>(); 200 201 final List<Store> afterExecuteStores = new ArrayList<>(); 202 203 private boolean completed = false; 204 205 @Override 206 public void notExecuted(Store store, String reason) { 207 notExecutedStores.add(Pair.newPair(store, reason)); 208 } 209 210 @Override 211 public void beforeExecution(Store store) { 212 beforeExecuteStores.add(store); 213 } 214 215 @Override 216 public void afterExecution(Store store) { 217 afterExecuteStores.add(store); 218 } 219 220 @Override 221 public synchronized void completed() { 222 completed = true; 223 notifyAll(); 224 } 225 226 public synchronized void await() throws InterruptedException { 227 while (!completed) { 228 wait(); 229 } 230 } 231 } 232 233 @Test 234 public void testRequestOnRegion() throws IOException, InterruptedException { 235 Tracker tracker = new Tracker(); 236 TRACKER = tracker; 237 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 238 tracker.await(); 239 assertEquals(1, tracker.notExecutedStores.size()); 240 assertEquals(Bytes.toString(CF2), 241 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 242 assertThat(tracker.notExecutedStores.get(0).getSecond(), 243 containsString("compaction request was cancelled")); 244 245 assertEquals(1, tracker.beforeExecuteStores.size()); 246 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 247 248 assertEquals(1, tracker.afterExecuteStores.size()); 249 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 250 } 251 252 @Test 253 public void testRequestOnStore() throws IOException, InterruptedException { 254 Tracker tracker = new Tracker(); 255 TRACKER = tracker; 256 region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker); 257 tracker.await(); 258 assertTrue(tracker.notExecutedStores.isEmpty()); 259 assertEquals(1, tracker.beforeExecuteStores.size()); 260 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 261 assertEquals(1, tracker.afterExecuteStores.size()); 262 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 263 264 tracker = new Tracker(); 265 TRACKER = tracker; 266 region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker); 267 tracker.await(); 268 assertEquals(1, tracker.notExecutedStores.size()); 269 assertEquals(Bytes.toString(CF2), 270 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 271 assertThat(tracker.notExecutedStores.get(0).getSecond(), 272 containsString("compaction request was cancelled")); 273 assertTrue(tracker.beforeExecuteStores.isEmpty()); 274 assertTrue(tracker.afterExecuteStores.isEmpty()); 275 } 276 277 // This test assumes that compaction wouldn't happen with null user. 278 // But null user means system generated compaction so compaction should happen 279 // even if the space quota is violated. So this test should be removed/ignored. 280 @Ignore @Test 281 public void testSpaceQuotaViolation() throws IOException, InterruptedException { 282 region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME, 283 new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L, 284 100L)); 285 Tracker tracker = new Tracker(); 286 TRACKER = tracker; 287 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 288 tracker.await(); 289 assertEquals(2, tracker.notExecutedStores.size()); 290 tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName() 291 .compareTo(p2.getFirst().getColumnFamilyName())); 292 293 assertEquals(Bytes.toString(CF2), 294 tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName()); 295 assertThat(tracker.notExecutedStores.get(1).getSecond(), 296 containsString("space quota violation")); 297 298 assertTrue(tracker.beforeExecuteStores.isEmpty()); 299 assertTrue(tracker.afterExecuteStores.isEmpty()); 300 } 301}