001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ExtendedCell; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.RegionLocator; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.coprocessor.ObserverContext; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 036import org.apache.hadoop.hbase.coprocessor.RegionObserver; 037import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 038import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 039import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.hamcrest.Matchers; 043import org.junit.jupiter.api.AfterAll; 044import org.junit.jupiter.api.BeforeAll; 045import org.junit.jupiter.api.BeforeEach; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.junit.jupiter.api.TestInfo; 049 050@Tag(MediumTests.TAG) 051@Tag(CoprocessorTests.TAG) 052public class TestCompactionWithShippingCoprocessor { 053 054 private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger(); 055 056 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 057 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 058 private String name; 059 060 @BeforeEach 061 public void setTestName(TestInfo testInfo) { 062 this.name = testInfo.getTestMethod().get().getName(); 063 } 064 065 @BeforeAll 066 public static void setUpBeforeClass() throws Exception { 067 Configuration conf = TEST_UTIL.getConfiguration(); 068 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 069 TEST_UTIL.startMiniCluster(1); 070 } 071 072 @AfterAll 073 public static void tearDownAfterClass() throws Exception { 074 TEST_UTIL.shutdownMiniCluster(); 075 } 076 077 /** 078 * Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped 079 * method is appropriately called in Compactor. 080 */ 081 @Test 082 public void testCoprocScannersExtendingShipperGetShipped() throws Exception { 083 int shippedCountBefore = SHIPPED_COUNT.get(); 084 final TableName tableName = TableName.valueOf(name); 085 // Create a table with block size as 1024 086 final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024, 087 CompactionObserver.class.getName()); 088 TEST_UTIL.loadTable(table, FAMILY); 089 TEST_UTIL.flush(); 090 try { 091 // get the block cache and region 092 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 093 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 094 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 095 // trigger a major compaction 096 TEST_UTIL.compact(true); 097 assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore)); 098 } finally { 099 table.close(); 100 } 101 } 102 103 public static class CompactionObserver implements RegionCoprocessor, RegionObserver { 104 105 @Override 106 public Optional<RegionObserver> getRegionObserver() { 107 return Optional.of(this); 108 } 109 110 @Override 111 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 112 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 113 CompactionRequest request) throws IOException { 114 return new ShippedObservingScanner(scanner); 115 } 116 } 117 118 public static class ShippedObservingScanner implements InternalScanner, Shipper { 119 120 protected final InternalScanner scanner; 121 122 public ShippedObservingScanner(InternalScanner scanner) { 123 this.scanner = scanner; 124 } 125 126 @Override 127 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 128 throws IOException { 129 return scanner.next(result, scannerContext); 130 } 131 132 @Override 133 public void close() throws IOException { 134 scanner.close(); 135 } 136 137 @Override 138 public void shipped() throws IOException { 139 if (scanner instanceof Shipper) { 140 SHIPPED_COUNT.incrementAndGet(); 141 ((Shipper) scanner).shipped(); 142 } 143 } 144 } 145}