001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertSame;
023import static org.junit.Assert.assertThat;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.Cell.Type;
032import org.apache.hadoop.hbase.CellBuilderFactory;
033import org.apache.hadoop.hbase.CellBuilderType;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.coprocessor.ObserverContext;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
045import org.apache.hadoop.hbase.coprocessor.RegionObserver;
046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
047import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
048import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
050import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
051import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
052import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.Pair;
056import org.junit.After;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Ignore;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064
065/**
066 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own
067 * code.
068 */
069@Category({ CoprocessorTests.class, MediumTests.class })
070public class TestCompactionLifeCycleTracker {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestCompactionLifeCycleTracker.class);
075
076  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
077
078  private static final TableName NAME =
079      TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
080
081  private static final byte[] CF1 = Bytes.toBytes("CF1");
082
083  private static final byte[] CF2 = Bytes.toBytes("CF2");
084
085  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
086
087  private HRegion region;
088
089  private static CompactionLifeCycleTracker TRACKER = null;
090
091  // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
092  public static final class CompactionObserver implements RegionObserver, RegionCoprocessor {
093
094    @Override
095    public Optional<RegionObserver> getRegionObserver() {
096      return Optional.of(this);
097    }
098
099    @Override
100    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
101        List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
102        throws IOException {
103      if (TRACKER != null) {
104        assertSame(tracker, TRACKER);
105      }
106    }
107
108    @Override
109    public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
110        List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
111        CompactionRequest request) {
112      if (TRACKER != null) {
113        assertSame(tracker, TRACKER);
114      }
115    }
116
117    @Override
118    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
119        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
120        CompactionRequest request) throws IOException {
121      if (TRACKER != null) {
122        assertSame(tracker, TRACKER);
123      }
124      return scanner;
125    }
126
127    @Override
128    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
129        StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
130        throws IOException {
131      if (TRACKER != null) {
132        assertSame(tracker, TRACKER);
133      }
134    }
135  }
136
137  @BeforeClass
138  public static void setUpBeforeClass() throws Exception {
139    UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
140    UTIL.startMiniCluster(3);
141  }
142
143  @AfterClass
144  public static void tearDownAfterClass() throws Exception {
145    UTIL.shutdownMiniCluster();
146  }
147
148  @Before
149  public void setUp() throws IOException {
150    UTIL.getAdmin()
151        .createTable(TableDescriptorBuilder.newBuilder(NAME)
152            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
153            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2))
154            .setCoprocessor(CompactionObserver.class.getName()).build());
155    try (Table table = UTIL.getConnection().getTable(NAME)) {
156      for (int i = 0; i < 100; i++) {
157        byte[] row = Bytes.toBytes(i);
158        table.put(new Put(row)
159                    .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
160                        .setRow(row)
161                        .setFamily(CF1)
162                        .setQualifier(QUALIFIER)
163                        .setTimestamp(HConstants.LATEST_TIMESTAMP)
164                        .setType(Cell.Type.Put)
165                        .setValue(Bytes.toBytes(i))
166                        .build()));
167      }
168      UTIL.getAdmin().flush(NAME);
169      for (int i = 100; i < 200; i++) {
170        byte[] row = Bytes.toBytes(i);
171        table.put(new Put(row)
172                    .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
173                        .setRow(row)
174                        .setFamily(CF1)
175                        .setQualifier(QUALIFIER)
176                        .setTimestamp(HConstants.LATEST_TIMESTAMP)
177                        .setType(Type.Put)
178                        .setValue(Bytes.toBytes(i))
179                        .build()));
180      }
181      UTIL.getAdmin().flush(NAME);
182    }
183    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
184    assertEquals(2, region.getStore(CF1).getStorefilesCount());
185    assertEquals(0, region.getStore(CF2).getStorefilesCount());
186  }
187
188  @After
189  public void tearDown() throws IOException {
190    region = null;
191    TRACKER = null;
192    UTIL.deleteTable(NAME);
193  }
194
195  private static final class Tracker implements CompactionLifeCycleTracker {
196
197    final List<Pair<Store, String>> notExecutedStores = new ArrayList<>();
198
199    final List<Store> beforeExecuteStores = new ArrayList<>();
200
201    final List<Store> afterExecuteStores = new ArrayList<>();
202
203    private boolean completed = false;
204
205    @Override
206    public void notExecuted(Store store, String reason) {
207      notExecutedStores.add(Pair.newPair(store, reason));
208    }
209
210    @Override
211    public void beforeExecution(Store store) {
212      beforeExecuteStores.add(store);
213    }
214
215    @Override
216    public void afterExecution(Store store) {
217      afterExecuteStores.add(store);
218    }
219
220    @Override
221    public synchronized void completed() {
222      completed = true;
223      notifyAll();
224    }
225
226    public synchronized void await() throws InterruptedException {
227      while (!completed) {
228        wait();
229      }
230    }
231  }
232
233  @Test
234  public void testRequestOnRegion() throws IOException, InterruptedException {
235    Tracker tracker = new Tracker();
236    TRACKER = tracker;
237    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
238    tracker.await();
239    assertEquals(1, tracker.notExecutedStores.size());
240    assertEquals(Bytes.toString(CF2),
241      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
242    assertThat(tracker.notExecutedStores.get(0).getSecond(),
243      containsString("compaction request was cancelled"));
244
245    assertEquals(1, tracker.beforeExecuteStores.size());
246    assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
247
248    assertEquals(1, tracker.afterExecuteStores.size());
249    assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
250  }
251
252  @Test
253  public void testRequestOnStore() throws IOException, InterruptedException {
254    Tracker tracker = new Tracker();
255    TRACKER = tracker;
256    region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker);
257    tracker.await();
258    assertTrue(tracker.notExecutedStores.isEmpty());
259    assertEquals(1, tracker.beforeExecuteStores.size());
260    assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
261    assertEquals(1, tracker.afterExecuteStores.size());
262    assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
263
264    tracker = new Tracker();
265    TRACKER = tracker;
266    region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker);
267    tracker.await();
268    assertEquals(1, tracker.notExecutedStores.size());
269    assertEquals(Bytes.toString(CF2),
270      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
271    assertThat(tracker.notExecutedStores.get(0).getSecond(),
272      containsString("compaction request was cancelled"));
273    assertTrue(tracker.beforeExecuteStores.isEmpty());
274    assertTrue(tracker.afterExecuteStores.isEmpty());
275  }
276
277  // This test assumes that compaction wouldn't happen with null user.
278  // But null user means system generated compaction so compaction should happen
279  // even if the space quota is violated. So this test should be removed/ignored.
280  @Ignore @Test
281  public void testSpaceQuotaViolation() throws IOException, InterruptedException {
282    region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME,
283      new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L,
284          100L));
285    Tracker tracker = new Tracker();
286    TRACKER = tracker;
287    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
288    tracker.await();
289    assertEquals(2, tracker.notExecutedStores.size());
290    tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName()
291        .compareTo(p2.getFirst().getColumnFamilyName()));
292
293    assertEquals(Bytes.toString(CF2),
294      tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName());
295    assertThat(tracker.notExecutedStores.get(1).getSecond(),
296      containsString("space quota violation"));
297
298    assertTrue(tracker.beforeExecuteStores.isEmpty());
299    assertTrue(tracker.afterExecuteStores.isEmpty());
300  }
301}