001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertSame;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.Cell.Type;
032import org.apache.hadoop.hbase.CellBuilderFactory;
033import org.apache.hadoop.hbase.CellBuilderType;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
047import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
048import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
049import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
050import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
051import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.Pair;
055import org.junit.jupiter.api.AfterAll;
056import org.junit.jupiter.api.AfterEach;
057import org.junit.jupiter.api.BeforeAll;
058import org.junit.jupiter.api.BeforeEach;
059import org.junit.jupiter.api.Disabled;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062
063/**
064 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own
065 * code.
066 */
067@Tag(CoprocessorTests.TAG)
068@Tag(MediumTests.TAG)
069public class TestCompactionLifeCycleTracker {
070
071  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
072
073  private static final TableName NAME =
074    TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
075
076  private static final byte[] CF1 = Bytes.toBytes("CF1");
077
078  private static final byte[] CF2 = Bytes.toBytes("CF2");
079
080  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
081
082  private HRegion region;
083
084  private static CompactionLifeCycleTracker TRACKER = null;
085
086  // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
087  public static final class CompactionObserver implements RegionObserver, RegionCoprocessor {
088
089    @Override
090    public Optional<RegionObserver> getRegionObserver() {
091      return Optional.of(this);
092    }
093
094    @Override
095    public void preCompactSelection(ObserverContext<? extends RegionCoprocessorEnvironment> c,
096      Store store, List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
097      throws IOException {
098      if (TRACKER != null) {
099        assertSame(tracker, TRACKER);
100      }
101    }
102
103    @Override
104    public void postCompactSelection(ObserverContext<? extends RegionCoprocessorEnvironment> c,
105      Store store, List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
106      CompactionRequest request) {
107      if (TRACKER != null) {
108        assertSame(tracker, TRACKER);
109      }
110    }
111
112    @Override
113    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
114      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
115      CompactionRequest request) throws IOException {
116      if (TRACKER != null) {
117        assertSame(tracker, TRACKER);
118      }
119      return scanner;
120    }
121
122    @Override
123    public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store,
124      StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
125      throws IOException {
126      if (TRACKER != null) {
127        assertSame(tracker, TRACKER);
128      }
129    }
130  }
131
132  @BeforeAll
133  public static void setUpBeforeClass() throws Exception {
134    UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
135    UTIL.startMiniCluster(3);
136  }
137
138  @AfterAll
139  public static void tearDownAfterClass() throws Exception {
140    UTIL.shutdownMiniCluster();
141  }
142
143  @BeforeEach
144  public void setUp() throws IOException {
145    UTIL.getAdmin()
146      .createTable(TableDescriptorBuilder.newBuilder(NAME)
147        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
148        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2))
149        .setCoprocessor(CompactionObserver.class.getName()).build());
150    try (Table table = UTIL.getConnection().getTable(NAME)) {
151      for (int i = 0; i < 100; i++) {
152        byte[] row = Bytes.toBytes(i);
153        table
154          .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
155            .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP)
156            .setType(Cell.Type.Put).setValue(Bytes.toBytes(i)).build()));
157      }
158      UTIL.getAdmin().flush(NAME);
159      for (int i = 100; i < 200; i++) {
160        byte[] row = Bytes.toBytes(i);
161        table
162          .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
163            .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP)
164            .setType(Type.Put).setValue(Bytes.toBytes(i)).build()));
165      }
166      UTIL.getAdmin().flush(NAME);
167    }
168    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
169    assertEquals(2, region.getStore(CF1).getStorefilesCount());
170    assertEquals(0, region.getStore(CF2).getStorefilesCount());
171  }
172
173  @AfterEach
174  public void tearDown() throws IOException {
175    region = null;
176    TRACKER = null;
177    UTIL.deleteTable(NAME);
178  }
179
180  private static final class Tracker implements CompactionLifeCycleTracker {
181
182    final List<Pair<Store, String>> notExecutedStores = new ArrayList<>();
183
184    final List<Store> beforeExecuteStores = new ArrayList<>();
185
186    final List<Store> afterExecuteStores = new ArrayList<>();
187
188    private boolean completed = false;
189
190    @Override
191    public void notExecuted(Store store, String reason) {
192      notExecutedStores.add(Pair.newPair(store, reason));
193    }
194
195    @Override
196    public void beforeExecution(Store store) {
197      beforeExecuteStores.add(store);
198    }
199
200    @Override
201    public void afterExecution(Store store) {
202      afterExecuteStores.add(store);
203    }
204
205    @Override
206    public synchronized void completed() {
207      completed = true;
208      notifyAll();
209    }
210
211    public synchronized void await() throws InterruptedException {
212      while (!completed) {
213        wait();
214      }
215    }
216  }
217
218  @Test
219  public void testRequestOnRegion() throws IOException, InterruptedException {
220    Tracker tracker = new Tracker();
221    TRACKER = tracker;
222    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
223    tracker.await();
224    assertEquals(1, tracker.notExecutedStores.size());
225    assertEquals(Bytes.toString(CF2),
226      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
227    assertThat(tracker.notExecutedStores.get(0).getSecond(),
228      containsString("compaction request was cancelled"));
229
230    assertEquals(1, tracker.beforeExecuteStores.size());
231    assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
232
233    assertEquals(1, tracker.afterExecuteStores.size());
234    assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
235  }
236
237  @Test
238  public void testRequestOnStore() throws IOException, InterruptedException {
239    Tracker tracker = new Tracker();
240    TRACKER = tracker;
241    region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker);
242    tracker.await();
243    assertTrue(tracker.notExecutedStores.isEmpty());
244    assertEquals(1, tracker.beforeExecuteStores.size());
245    assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
246    assertEquals(1, tracker.afterExecuteStores.size());
247    assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
248
249    tracker = new Tracker();
250    TRACKER = tracker;
251    region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker);
252    tracker.await();
253    assertEquals(1, tracker.notExecutedStores.size());
254    assertEquals(Bytes.toString(CF2),
255      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
256    assertThat(tracker.notExecutedStores.get(0).getSecond(),
257      containsString("compaction request was cancelled"));
258    assertTrue(tracker.beforeExecuteStores.isEmpty());
259    assertTrue(tracker.afterExecuteStores.isEmpty());
260  }
261
262  // This test assumes that compaction wouldn't happen with null user.
263  // But null user means system generated compaction so compaction should happen
264  // even if the space quota is violated. So this test should be removed/ignored.
265  @Disabled
266  @Test
267  public void testSpaceQuotaViolation() throws IOException, InterruptedException {
268    region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME,
269      new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L,
270        100L));
271    Tracker tracker = new Tracker();
272    TRACKER = tracker;
273    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
274    tracker.await();
275    assertEquals(2, tracker.notExecutedStores.size());
276    tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName()
277      .compareTo(p2.getFirst().getColumnFamilyName()));
278
279    assertEquals(Bytes.toString(CF2),
280      tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName());
281    assertThat(tracker.notExecutedStores.get(1).getSecond(),
282      containsString("space quota violation"));
283
284    assertTrue(tracker.beforeExecuteStores.isEmpty());
285    assertTrue(tracker.afterExecuteStores.isEmpty());
286  }
287}