001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.COPROCESSORS_ENABLED_CONF_KEY; 021import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; 022import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR; 023import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.Optional; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CellComparator; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeepDeletedCells; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.io.TimeRange; 044import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 045import org.apache.hadoop.hbase.regionserver.HRegion; 046import org.apache.hadoop.hbase.regionserver.HStore; 047import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 048import org.apache.hadoop.hbase.regionserver.RegionServerServices; 049import org.apache.hadoop.hbase.regionserver.ScanInfo; 050import org.apache.hadoop.hbase.regionserver.ScanType; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 052import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.testclassification.SmallTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.jupiter.api.BeforeEach; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.junit.jupiter.api.TestInfo; 060 061@Tag(SmallTests.TAG) 062public class TestRegionCoprocessorHost { 063 private Configuration conf; 064 065 private String currentTestName; 066 private RegionInfo regionInfo; 067 private HRegion region; 068 private RegionServerServices rsServices; 069 public static final int MAX_VERSIONS = 3; 070 public static final int MIN_VERSIONS = 2; 071 public static final int TTL = 1000; 072 public static final int TIME_TO_PURGE_DELETES = 2000; 073 074 @BeforeEach 075 public void setup(TestInfo testInfo) throws IOException { 076 currentTestName = testInfo.getTestMethod().get().getName(); 077 init(null); 078 } 079 080 private void init(Boolean flag) throws IOException { 081 conf = HBaseConfiguration.create(); 082 conf.setBoolean(COPROCESSORS_ENABLED_CONF_KEY, true); 083 conf.setBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, true); 084 TableName tableName = TableName.valueOf(currentTestName); 085 regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 086 TableDescriptor tableDesc = null; 087 if (flag == null) { 088 // configure a coprocessor which override postScannerFilterRow 089 tableDesc = TableDescriptorBuilder.newBuilder(tableName) 090 .setCoprocessor(SimpleRegionObserver.class.getName()).build(); 091 } else if (flag) { 092 // configure a coprocessor which don't override postScannerFilterRow 093 tableDesc = TableDescriptorBuilder.newBuilder(tableName) 094 .setCoprocessor(TempRegionObserver.class.getName()).build(); 095 } else { 096 // configure two coprocessors, one don't override postScannerFilterRow but another one does 097 conf.set(REGION_COPROCESSOR_CONF_KEY, TempRegionObserver.class.getName()); 098 tableDesc = TableDescriptorBuilder.newBuilder(tableName) 099 .setCoprocessor(SimpleRegionObserver.class.getName()).build(); 100 } 101 region = mock(HRegion.class); 102 when(region.getRegionInfo()).thenReturn(regionInfo); 103 when(region.getTableDescriptor()).thenReturn(tableDesc); 104 rsServices = mock(RegionServerServices.class); 105 } 106 107 @Test 108 public void testLoadDuplicateCoprocessor() throws Exception { 109 conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true); 110 conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName()); 111 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 112 // Only one coprocessor SimpleRegionObserver loaded 113 assertEquals(1, host.coprocEnvironments.size()); 114 115 // Allow to load duplicate coprocessor 116 conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, false); 117 host = new RegionCoprocessorHost(region, rsServices, conf); 118 // Two duplicate coprocessors loaded 119 assertEquals(2, host.coprocEnvironments.size()); 120 } 121 122 @Test 123 public void testPreStoreScannerOpen() throws IOException { 124 125 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 126 Scan scan = new Scan(); 127 scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP); 128 assertTrue(scan.getTimeRange().isAllTime(), "Scan is not for all time"); 129 // SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan 130 // is for all time. this lets us exercise both that the Scan is wired up properly in the coproc 131 // and that we can customize the metadata 132 133 ScanInfo oldScanInfo = getScanInfo(); 134 135 HStore store = mock(HStore.class); 136 when(store.getScanInfo()).thenReturn(oldScanInfo); 137 ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan); 138 139 verifyScanInfo(newScanInfo); 140 } 141 142 @Test 143 public void testPreCompactScannerOpen() throws IOException { 144 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 145 ScanInfo oldScanInfo = getScanInfo(); 146 HStore store = mock(HStore.class); 147 when(store.getScanInfo()).thenReturn(oldScanInfo); 148 ScanInfo newScanInfo = host.preCompactScannerOpen(store, ScanType.COMPACT_DROP_DELETES, 149 mock(CompactionLifeCycleTracker.class), mock(CompactionRequest.class), mock(User.class)); 150 verifyScanInfo(newScanInfo); 151 } 152 153 @Test 154 public void testPreFlushScannerOpen() throws IOException { 155 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 156 ScanInfo oldScanInfo = getScanInfo(); 157 HStore store = mock(HStore.class); 158 when(store.getScanInfo()).thenReturn(oldScanInfo); 159 ScanInfo newScanInfo = host.preFlushScannerOpen(store, mock(FlushLifeCycleTracker.class)); 160 verifyScanInfo(newScanInfo); 161 } 162 163 @Test 164 public void testPreMemStoreCompactionCompactScannerOpen() throws IOException { 165 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 166 ScanInfo oldScanInfo = getScanInfo(); 167 HStore store = mock(HStore.class); 168 when(store.getScanInfo()).thenReturn(oldScanInfo); 169 ScanInfo newScanInfo = host.preMemStoreCompactionCompactScannerOpen(store); 170 verifyScanInfo(newScanInfo); 171 } 172 173 @Test 174 public void testPostScannerFilterRow() throws IOException { 175 // By default SimpleRegionObserver is set as region coprocessor which implements 176 // postScannerFilterRow 177 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 178 assertTrue(host.hasCustomPostScannerFilterRow(), 179 "Region coprocessor implement postScannerFilterRow"); 180 181 // Set a region CP which doesn't implement postScannerFilterRow 182 init(true); 183 host = new RegionCoprocessorHost(region, rsServices, conf); 184 assertFalse(host.hasCustomPostScannerFilterRow(), 185 "Region coprocessor implement postScannerFilterRow"); 186 187 // Set multiple region CPs, in which one implements postScannerFilterRow 188 init(false); 189 host = new RegionCoprocessorHost(region, rsServices, conf); 190 assertTrue(host.hasCustomPostScannerFilterRow(), 191 "Region coprocessor doesn't implement postScannerFilterRow"); 192 } 193 194 private void verifyScanInfo(ScanInfo newScanInfo) { 195 assertEquals(KeepDeletedCells.TRUE, newScanInfo.getKeepDeletedCells()); 196 assertEquals(MAX_VERSIONS, newScanInfo.getMaxVersions()); 197 assertEquals(MIN_VERSIONS, newScanInfo.getMinVersions()); 198 assertEquals(TTL, newScanInfo.getTtl()); 199 assertEquals(TIME_TO_PURGE_DELETES, newScanInfo.getTimeToPurgeDeletes()); 200 } 201 202 private ScanInfo getScanInfo() { 203 int oldMaxVersions = 1; 204 int oldMinVersions = 0; 205 long oldTTL = 10000; 206 207 return new ScanInfo(conf, Bytes.toBytes("cf"), oldMinVersions, oldMaxVersions, oldTTL, 208 KeepDeletedCells.FALSE, HConstants.FOREVER, 1000, CellComparator.getInstance(), true); 209 } 210 211 /* 212 * Simple region coprocessor which doesn't override postScannerFilterRow 213 */ 214 public static class TempRegionObserver implements RegionCoprocessor, RegionObserver { 215 @Override 216 public Optional<RegionObserver> getRegionObserver() { 217 return Optional.of(this); 218 } 219 } 220}