001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.ExtendedCell;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.RegionLocator;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
036import org.apache.hadoop.hbase.coprocessor.RegionObserver;
037import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
038import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
039import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.hamcrest.Matchers;
043import org.junit.jupiter.api.AfterAll;
044import org.junit.jupiter.api.BeforeAll;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.junit.jupiter.api.TestInfo;
049
050@Tag(MediumTests.TAG)
051@Tag(CoprocessorTests.TAG)
052public class TestCompactionWithShippingCoprocessor {
053
054  private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger();
055
056  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
057  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
058  private String name;
059
060  @BeforeEach
061  public void setTestName(TestInfo testInfo) {
062    this.name = testInfo.getTestMethod().get().getName();
063  }
064
065  @BeforeAll
066  public static void setUpBeforeClass() throws Exception {
067    Configuration conf = TEST_UTIL.getConfiguration();
068    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
069    TEST_UTIL.startMiniCluster(1);
070  }
071
072  @AfterAll
073  public static void tearDownAfterClass() throws Exception {
074    TEST_UTIL.shutdownMiniCluster();
075  }
076
077  /**
078   * Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped
079   * method is appropriately called in Compactor.
080   */
081  @Test
082  public void testCoprocScannersExtendingShipperGetShipped() throws Exception {
083    int shippedCountBefore = SHIPPED_COUNT.get();
084    final TableName tableName = TableName.valueOf(name);
085    // Create a table with block size as 1024
086    final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024,
087      CompactionObserver.class.getName());
088    TEST_UTIL.loadTable(table, FAMILY);
089    TEST_UTIL.flush();
090    try {
091      // get the block cache and region
092      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
093      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
094      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
095      // trigger a major compaction
096      TEST_UTIL.compact(true);
097      assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore));
098    } finally {
099      table.close();
100    }
101  }
102
103  public static class CompactionObserver implements RegionCoprocessor, RegionObserver {
104
105    @Override
106    public Optional<RegionObserver> getRegionObserver() {
107      return Optional.of(this);
108    }
109
110    @Override
111    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
112      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
113      CompactionRequest request) throws IOException {
114      return new ShippedObservingScanner(scanner);
115    }
116  }
117
118  public static class ShippedObservingScanner implements InternalScanner, Shipper {
119
120    protected final InternalScanner scanner;
121
122    public ShippedObservingScanner(InternalScanner scanner) {
123      this.scanner = scanner;
124    }
125
126    @Override
127    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
128      throws IOException {
129      return scanner.next(result, scannerContext);
130    }
131
132    @Override
133    public void close() throws IOException {
134      scanner.close();
135    }
136
137    @Override
138    public void shipped() throws IOException {
139      if (scanner instanceof Shipper) {
140        SHIPPED_COUNT.incrementAndGet();
141        ((Shipper) scanner).shipped();
142      }
143    }
144  }
145}