001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertSame;
023import static org.junit.Assert.assertThat;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.Cell.Type;
032import org.apache.hadoop.hbase.CellBuilderFactory;
033import org.apache.hadoop.hbase.CellBuilderType;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.coprocessor.ObserverContext;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
045import org.apache.hadoop.hbase.coprocessor.RegionObserver;
046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
047import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
048import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
050import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
051import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
052import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.Pair;
056import org.junit.After;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063
064/**
065 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own
066 * code.
067 */
068@Category({ CoprocessorTests.class, MediumTests.class })
069public class TestCompactionLifeCycleTracker {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestCompactionLifeCycleTracker.class);
074
075  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
076
077  private static final TableName NAME =
078      TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
079
080  private static final byte[] CF1 = Bytes.toBytes("CF1");
081
082  private static final byte[] CF2 = Bytes.toBytes("CF2");
083
084  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
085
086  private HRegion region;
087
088  private static CompactionLifeCycleTracker TRACKER = null;
089
090  // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
091  public static final class CompactionObserver implements RegionObserver, RegionCoprocessor {
092
093    @Override
094    public Optional<RegionObserver> getRegionObserver() {
095      return Optional.of(this);
096    }
097
098    @Override
099    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
100        List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
101        throws IOException {
102      if (TRACKER != null) {
103        assertSame(tracker, TRACKER);
104      }
105    }
106
107    @Override
108    public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
109        List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
110        CompactionRequest request) {
111      if (TRACKER != null) {
112        assertSame(tracker, TRACKER);
113      }
114    }
115
116    @Override
117    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
118        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
119        CompactionRequest request) throws IOException {
120      if (TRACKER != null) {
121        assertSame(tracker, TRACKER);
122      }
123      return scanner;
124    }
125
126    @Override
127    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
128        StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
129        throws IOException {
130      if (TRACKER != null) {
131        assertSame(tracker, TRACKER);
132      }
133    }
134  }
135
136  @BeforeClass
137  public static void setUpBeforeClass() throws Exception {
138    UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
139    UTIL.startMiniCluster(3);
140  }
141
142  @AfterClass
143  public static void tearDownAfterClass() throws Exception {
144    UTIL.shutdownMiniCluster();
145  }
146
147  @Before
148  public void setUp() throws IOException {
149    UTIL.getAdmin()
150        .createTable(TableDescriptorBuilder.newBuilder(NAME)
151            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
152            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2))
153            .setCoprocessor(CompactionObserver.class.getName()).build());
154    try (Table table = UTIL.getConnection().getTable(NAME)) {
155      for (int i = 0; i < 100; i++) {
156        byte[] row = Bytes.toBytes(i);
157        table.put(new Put(row)
158                    .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
159                        .setRow(row)
160                        .setFamily(CF1)
161                        .setQualifier(QUALIFIER)
162                        .setTimestamp(HConstants.LATEST_TIMESTAMP)
163                        .setType(Cell.Type.Put)
164                        .setValue(Bytes.toBytes(i))
165                        .build()));
166      }
167      UTIL.getAdmin().flush(NAME);
168      for (int i = 100; i < 200; i++) {
169        byte[] row = Bytes.toBytes(i);
170        table.put(new Put(row)
171                    .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
172                        .setRow(row)
173                        .setFamily(CF1)
174                        .setQualifier(QUALIFIER)
175                        .setTimestamp(HConstants.LATEST_TIMESTAMP)
176                        .setType(Type.Put)
177                        .setValue(Bytes.toBytes(i))
178                        .build()));
179      }
180      UTIL.getAdmin().flush(NAME);
181    }
182    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
183    assertEquals(2, region.getStore(CF1).getStorefilesCount());
184    assertEquals(0, region.getStore(CF2).getStorefilesCount());
185  }
186
187  @After
188  public void tearDown() throws IOException {
189    region = null;
190    TRACKER = null;
191    UTIL.deleteTable(NAME);
192  }
193
194  private static final class Tracker implements CompactionLifeCycleTracker {
195
196    final List<Pair<Store, String>> notExecutedStores = new ArrayList<>();
197
198    final List<Store> beforeExecuteStores = new ArrayList<>();
199
200    final List<Store> afterExecuteStores = new ArrayList<>();
201
202    private boolean completed = false;
203
204    @Override
205    public void notExecuted(Store store, String reason) {
206      notExecutedStores.add(Pair.newPair(store, reason));
207    }
208
209    @Override
210    public void beforeExecution(Store store) {
211      beforeExecuteStores.add(store);
212    }
213
214    @Override
215    public void afterExecution(Store store) {
216      afterExecuteStores.add(store);
217    }
218
219    @Override
220    public synchronized void completed() {
221      completed = true;
222      notifyAll();
223    }
224
225    public synchronized void await() throws InterruptedException {
226      while (!completed) {
227        wait();
228      }
229    }
230  }
231
232  @Test
233  public void testRequestOnRegion() throws IOException, InterruptedException {
234    Tracker tracker = new Tracker();
235    TRACKER = tracker;
236    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
237    tracker.await();
238    assertEquals(1, tracker.notExecutedStores.size());
239    assertEquals(Bytes.toString(CF2),
240      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
241    assertThat(tracker.notExecutedStores.get(0).getSecond(),
242      containsString("compaction request was cancelled"));
243
244    assertEquals(1, tracker.beforeExecuteStores.size());
245    assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
246
247    assertEquals(1, tracker.afterExecuteStores.size());
248    assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
249  }
250
251  @Test
252  public void testRequestOnStore() throws IOException, InterruptedException {
253    Tracker tracker = new Tracker();
254    TRACKER = tracker;
255    region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker);
256    tracker.await();
257    assertTrue(tracker.notExecutedStores.isEmpty());
258    assertEquals(1, tracker.beforeExecuteStores.size());
259    assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
260    assertEquals(1, tracker.afterExecuteStores.size());
261    assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
262
263    tracker = new Tracker();
264    TRACKER = tracker;
265    region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker);
266    tracker.await();
267    assertEquals(1, tracker.notExecutedStores.size());
268    assertEquals(Bytes.toString(CF2),
269      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
270    assertThat(tracker.notExecutedStores.get(0).getSecond(),
271      containsString("compaction request was cancelled"));
272    assertTrue(tracker.beforeExecuteStores.isEmpty());
273    assertTrue(tracker.afterExecuteStores.isEmpty());
274  }
275
276  @Test
277  public void testSpaceQuotaViolation() throws IOException, InterruptedException {
278    region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME,
279      new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L,
280          100L));
281    Tracker tracker = new Tracker();
282    TRACKER = tracker;
283    region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
284    tracker.await();
285    assertEquals(2, tracker.notExecutedStores.size());
286    tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName()
287        .compareTo(p2.getFirst().getColumnFamilyName()));
288
289    assertEquals(Bytes.toString(CF1),
290      tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
291    assertThat(tracker.notExecutedStores.get(0).getSecond(),
292      containsString("space quota violation"));
293
294    assertEquals(Bytes.toString(CF2),
295      tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName());
296    assertThat(tracker.notExecutedStores.get(1).getSecond(),
297      containsString("space quota violation"));
298
299    assertTrue(tracker.beforeExecuteStores.isEmpty());
300    assertTrue(tracker.afterExecuteStores.isEmpty());
301  }
302}