001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.COPROCESSORS_ENABLED_CONF_KEY; 021import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; 022import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR; 023import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.Optional; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CellComparator; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeepDeletedCells; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.io.TimeRange; 045import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.HStore; 048import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.regionserver.ScanInfo; 051import org.apache.hadoop.hbase.regionserver.ScanType; 052import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 053import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063 064@Category({ SmallTests.class }) 065public class TestRegionCoprocessorHost { 066 private Configuration conf; 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestRegionCoprocessorHost.class); 071 072 @Rule 073 public final TestName name = new TestName(); 074 private RegionInfo regionInfo; 075 private HRegion region; 076 private RegionServerServices rsServices; 077 public static final int MAX_VERSIONS = 3; 078 public static final int MIN_VERSIONS = 2; 079 public static final int TTL = 1000; 080 public static final int TIME_TO_PURGE_DELETES = 2000; 081 082 @Before 083 public void setup() throws IOException { 084 init(null); 085 } 086 087 private void init(Boolean flag) throws IOException { 088 conf = HBaseConfiguration.create(); 089 conf.setBoolean(COPROCESSORS_ENABLED_CONF_KEY, true); 090 conf.setBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, true); 091 TableName tableName = TableName.valueOf(name.getMethodName()); 092 regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); 093 TableDescriptor tableDesc = null; 094 if (flag == null) { 095 // configure a coprocessor which override postScannerFilterRow 096 tableDesc = TableDescriptorBuilder.newBuilder(tableName) 097 .setCoprocessor(SimpleRegionObserver.class.getName()).build(); 098 } else if (flag) { 099 // configure a coprocessor which don't override postScannerFilterRow 100 tableDesc = TableDescriptorBuilder.newBuilder(tableName) 101 .setCoprocessor(TempRegionObserver.class.getName()).build(); 102 } else { 103 // configure two coprocessors, one don't override postScannerFilterRow but another one does 104 conf.set(REGION_COPROCESSOR_CONF_KEY, TempRegionObserver.class.getName()); 105 tableDesc = TableDescriptorBuilder.newBuilder(tableName) 106 .setCoprocessor(SimpleRegionObserver.class.getName()).build(); 107 } 108 region = mock(HRegion.class); 109 when(region.getRegionInfo()).thenReturn(regionInfo); 110 when(region.getTableDescriptor()).thenReturn(tableDesc); 111 rsServices = mock(RegionServerServices.class); 112 } 113 114 @Test 115 public void testLoadDuplicateCoprocessor() throws Exception { 116 conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true); 117 conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName()); 118 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 119 // Only one coprocessor SimpleRegionObserver loaded 120 assertEquals(1, host.coprocEnvironments.size()); 121 122 // Allow to load duplicate coprocessor 123 conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, false); 124 host = new RegionCoprocessorHost(region, rsServices, conf); 125 // Two duplicate coprocessors loaded 126 assertEquals(2, host.coprocEnvironments.size()); 127 } 128 129 @Test 130 public void testPreStoreScannerOpen() throws IOException { 131 132 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 133 Scan scan = new Scan(); 134 scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP); 135 assertTrue("Scan is not for all time", scan.getTimeRange().isAllTime()); 136 // SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan 137 // is for all time. this lets us exercise both that the Scan is wired up properly in the coproc 138 // and that we can customize the metadata 139 140 ScanInfo oldScanInfo = getScanInfo(); 141 142 HStore store = mock(HStore.class); 143 when(store.getScanInfo()).thenReturn(oldScanInfo); 144 ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan); 145 146 verifyScanInfo(newScanInfo); 147 } 148 149 @Test 150 public void testPreCompactScannerOpen() throws IOException { 151 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 152 ScanInfo oldScanInfo = getScanInfo(); 153 HStore store = mock(HStore.class); 154 when(store.getScanInfo()).thenReturn(oldScanInfo); 155 ScanInfo newScanInfo = host.preCompactScannerOpen(store, ScanType.COMPACT_DROP_DELETES, 156 mock(CompactionLifeCycleTracker.class), mock(CompactionRequest.class), mock(User.class)); 157 verifyScanInfo(newScanInfo); 158 } 159 160 @Test 161 public void testPreFlushScannerOpen() throws IOException { 162 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 163 ScanInfo oldScanInfo = getScanInfo(); 164 HStore store = mock(HStore.class); 165 when(store.getScanInfo()).thenReturn(oldScanInfo); 166 ScanInfo newScanInfo = host.preFlushScannerOpen(store, mock(FlushLifeCycleTracker.class)); 167 verifyScanInfo(newScanInfo); 168 } 169 170 @Test 171 public void testPreMemStoreCompactionCompactScannerOpen() throws IOException { 172 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 173 ScanInfo oldScanInfo = getScanInfo(); 174 HStore store = mock(HStore.class); 175 when(store.getScanInfo()).thenReturn(oldScanInfo); 176 ScanInfo newScanInfo = host.preMemStoreCompactionCompactScannerOpen(store); 177 verifyScanInfo(newScanInfo); 178 } 179 180 @Test 181 public void testPostScannerFilterRow() throws IOException { 182 // By default SimpleRegionObserver is set as region coprocessor which implements 183 // postScannerFilterRow 184 RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf); 185 assertTrue("Region coprocessor implement postScannerFilterRow", 186 host.hasCustomPostScannerFilterRow()); 187 188 // Set a region CP which doesn't implement postScannerFilterRow 189 init(true); 190 host = new RegionCoprocessorHost(region, rsServices, conf); 191 assertFalse("Region coprocessor implement postScannerFilterRow", 192 host.hasCustomPostScannerFilterRow()); 193 194 // Set multiple region CPs, in which one implements postScannerFilterRow 195 init(false); 196 host = new RegionCoprocessorHost(region, rsServices, conf); 197 assertTrue("Region coprocessor doesn't implement postScannerFilterRow", 198 host.hasCustomPostScannerFilterRow()); 199 } 200 201 private void verifyScanInfo(ScanInfo newScanInfo) { 202 assertEquals(KeepDeletedCells.TRUE, newScanInfo.getKeepDeletedCells()); 203 assertEquals(MAX_VERSIONS, newScanInfo.getMaxVersions()); 204 assertEquals(MIN_VERSIONS, newScanInfo.getMinVersions()); 205 assertEquals(TTL, newScanInfo.getTtl()); 206 assertEquals(TIME_TO_PURGE_DELETES, newScanInfo.getTimeToPurgeDeletes()); 207 } 208 209 private ScanInfo getScanInfo() { 210 int oldMaxVersions = 1; 211 int oldMinVersions = 0; 212 long oldTTL = 10000; 213 214 return new ScanInfo(conf, Bytes.toBytes("cf"), oldMinVersions, oldMaxVersions, oldTTL, 215 KeepDeletedCells.FALSE, HConstants.FOREVER, 1000, CellComparator.getInstance(), true); 216 } 217 218 /* 219 * Simple region coprocessor which doesn't override postScannerFilterRow 220 */ 221 public static class TempRegionObserver implements RegionCoprocessor, RegionObserver { 222 @Override 223 public Optional<RegionObserver> getRegionObserver() { 224 return Optional.of(this); 225 } 226 } 227}