001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertSame; 023import static org.junit.Assert.assertThat; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.Cell.Type; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.coprocessor.ObserverContext; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 045import org.apache.hadoop.hbase.coprocessor.RegionObserver; 046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 047import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 048import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; 049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 050import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.junit.After; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063 064/** 065 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own 066 * code. 067 */ 068@Category({ CoprocessorTests.class, MediumTests.class }) 069public class TestCompactionLifeCycleTracker { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestCompactionLifeCycleTracker.class); 074 075 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 076 077 private static final TableName NAME = 078 TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); 079 080 private static final byte[] CF1 = Bytes.toBytes("CF1"); 081 082 private static final byte[] CF2 = Bytes.toBytes("CF2"); 083 084 private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); 085 086 private HRegion region; 087 088 private static CompactionLifeCycleTracker TRACKER = null; 089 090 // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. 091 public static final class CompactionObserver implements RegionObserver, RegionCoprocessor { 092 093 @Override 094 public Optional<RegionObserver> getRegionObserver() { 095 return Optional.of(this); 096 } 097 098 @Override 099 public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 100 List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) 101 throws IOException { 102 if (TRACKER != null) { 103 assertSame(tracker, TRACKER); 104 } 105 } 106 107 @Override 108 public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 109 List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker, 110 CompactionRequest request) { 111 if (TRACKER != null) { 112 assertSame(tracker, TRACKER); 113 } 114 } 115 116 @Override 117 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 118 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 119 CompactionRequest request) throws IOException { 120 if (TRACKER != null) { 121 assertSame(tracker, TRACKER); 122 } 123 return scanner; 124 } 125 126 @Override 127 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 128 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) 129 throws IOException { 130 if (TRACKER != null) { 131 assertSame(tracker, TRACKER); 132 } 133 } 134 } 135 136 @BeforeClass 137 public static void setUpBeforeClass() throws Exception { 138 UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); 139 UTIL.startMiniCluster(3); 140 } 141 142 @AfterClass 143 public static void tearDownAfterClass() throws Exception { 144 UTIL.shutdownMiniCluster(); 145 } 146 147 @Before 148 public void setUp() throws IOException { 149 UTIL.getAdmin() 150 .createTable(TableDescriptorBuilder.newBuilder(NAME) 151 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)) 153 .setCoprocessor(CompactionObserver.class.getName()).build()); 154 try (Table table = UTIL.getConnection().getTable(NAME)) { 155 for (int i = 0; i < 100; i++) { 156 byte[] row = Bytes.toBytes(i); 157 table.put(new Put(row) 158 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 159 .setRow(row) 160 .setFamily(CF1) 161 .setQualifier(QUALIFIER) 162 .setTimestamp(HConstants.LATEST_TIMESTAMP) 163 .setType(Cell.Type.Put) 164 .setValue(Bytes.toBytes(i)) 165 .build())); 166 } 167 UTIL.getAdmin().flush(NAME); 168 for (int i = 100; i < 200; i++) { 169 byte[] row = Bytes.toBytes(i); 170 table.put(new Put(row) 171 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 172 .setRow(row) 173 .setFamily(CF1) 174 .setQualifier(QUALIFIER) 175 .setTimestamp(HConstants.LATEST_TIMESTAMP) 176 .setType(Type.Put) 177 .setValue(Bytes.toBytes(i)) 178 .build())); 179 } 180 UTIL.getAdmin().flush(NAME); 181 } 182 region = UTIL.getHBaseCluster().getRegions(NAME).get(0); 183 assertEquals(2, region.getStore(CF1).getStorefilesCount()); 184 assertEquals(0, region.getStore(CF2).getStorefilesCount()); 185 } 186 187 @After 188 public void tearDown() throws IOException { 189 region = null; 190 TRACKER = null; 191 UTIL.deleteTable(NAME); 192 } 193 194 private static final class Tracker implements CompactionLifeCycleTracker { 195 196 final List<Pair<Store, String>> notExecutedStores = new ArrayList<>(); 197 198 final List<Store> beforeExecuteStores = new ArrayList<>(); 199 200 final List<Store> afterExecuteStores = new ArrayList<>(); 201 202 private boolean completed = false; 203 204 @Override 205 public void notExecuted(Store store, String reason) { 206 notExecutedStores.add(Pair.newPair(store, reason)); 207 } 208 209 @Override 210 public void beforeExecution(Store store) { 211 beforeExecuteStores.add(store); 212 } 213 214 @Override 215 public void afterExecution(Store store) { 216 afterExecuteStores.add(store); 217 } 218 219 @Override 220 public synchronized void completed() { 221 completed = true; 222 notifyAll(); 223 } 224 225 public synchronized void await() throws InterruptedException { 226 while (!completed) { 227 wait(); 228 } 229 } 230 } 231 232 @Test 233 public void testRequestOnRegion() throws IOException, InterruptedException { 234 Tracker tracker = new Tracker(); 235 TRACKER = tracker; 236 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 237 tracker.await(); 238 assertEquals(1, tracker.notExecutedStores.size()); 239 assertEquals(Bytes.toString(CF2), 240 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 241 assertThat(tracker.notExecutedStores.get(0).getSecond(), 242 containsString("compaction request was cancelled")); 243 244 assertEquals(1, tracker.beforeExecuteStores.size()); 245 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 246 247 assertEquals(1, tracker.afterExecuteStores.size()); 248 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 249 } 250 251 @Test 252 public void testRequestOnStore() throws IOException, InterruptedException { 253 Tracker tracker = new Tracker(); 254 TRACKER = tracker; 255 region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker); 256 tracker.await(); 257 assertTrue(tracker.notExecutedStores.isEmpty()); 258 assertEquals(1, tracker.beforeExecuteStores.size()); 259 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 260 assertEquals(1, tracker.afterExecuteStores.size()); 261 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 262 263 tracker = new Tracker(); 264 TRACKER = tracker; 265 region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker); 266 tracker.await(); 267 assertEquals(1, tracker.notExecutedStores.size()); 268 assertEquals(Bytes.toString(CF2), 269 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 270 assertThat(tracker.notExecutedStores.get(0).getSecond(), 271 containsString("compaction request was cancelled")); 272 assertTrue(tracker.beforeExecuteStores.isEmpty()); 273 assertTrue(tracker.afterExecuteStores.isEmpty()); 274 } 275 276 @Test 277 public void testSpaceQuotaViolation() throws IOException, InterruptedException { 278 region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME, 279 new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L, 280 100L)); 281 Tracker tracker = new Tracker(); 282 TRACKER = tracker; 283 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 284 tracker.await(); 285 assertEquals(2, tracker.notExecutedStores.size()); 286 tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName() 287 .compareTo(p2.getFirst().getColumnFamilyName())); 288 289 assertEquals(Bytes.toString(CF1), 290 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 291 assertThat(tracker.notExecutedStores.get(0).getSecond(), 292 containsString("space quota violation")); 293 294 assertEquals(Bytes.toString(CF2), 295 tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName()); 296 assertThat(tracker.notExecutedStores.get(1).getSecond(), 297 containsString("space quota violation")); 298 299 assertTrue(tracker.beforeExecuteStores.isEmpty()); 300 assertTrue(tracker.afterExecuteStores.isEmpty()); 301 } 302}