001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.COPROCESSORS_ENABLED_CONF_KEY;
021import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
022import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR;
023import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.Optional;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CellComparator;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeepDeletedCells;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.io.TimeRange;
044import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.regionserver.HStore;
047import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
048import org.apache.hadoop.hbase.regionserver.RegionServerServices;
049import org.apache.hadoop.hbase.regionserver.ScanInfo;
050import org.apache.hadoop.hbase.regionserver.ScanType;
051import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
052import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.testclassification.SmallTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.junit.jupiter.api.BeforeEach;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.junit.jupiter.api.TestInfo;
060
061@Tag(SmallTests.TAG)
062public class TestRegionCoprocessorHost {
063  private Configuration conf;
064
065  private String currentTestName;
066  private RegionInfo regionInfo;
067  private HRegion region;
068  private RegionServerServices rsServices;
069  public static final int MAX_VERSIONS = 3;
070  public static final int MIN_VERSIONS = 2;
071  public static final int TTL = 1000;
072  public static final int TIME_TO_PURGE_DELETES = 2000;
073
074  @BeforeEach
075  public void setup(TestInfo testInfo) throws IOException {
076    currentTestName = testInfo.getTestMethod().get().getName();
077    init(null);
078  }
079
080  private void init(Boolean flag) throws IOException {
081    conf = HBaseConfiguration.create();
082    conf.setBoolean(COPROCESSORS_ENABLED_CONF_KEY, true);
083    conf.setBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, true);
084    TableName tableName = TableName.valueOf(currentTestName);
085    regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
086    TableDescriptor tableDesc = null;
087    if (flag == null) {
088      // configure a coprocessor which override postScannerFilterRow
089      tableDesc = TableDescriptorBuilder.newBuilder(tableName)
090        .setCoprocessor(SimpleRegionObserver.class.getName()).build();
091    } else if (flag) {
092      // configure a coprocessor which don't override postScannerFilterRow
093      tableDesc = TableDescriptorBuilder.newBuilder(tableName)
094        .setCoprocessor(TempRegionObserver.class.getName()).build();
095    } else {
096      // configure two coprocessors, one don't override postScannerFilterRow but another one does
097      conf.set(REGION_COPROCESSOR_CONF_KEY, TempRegionObserver.class.getName());
098      tableDesc = TableDescriptorBuilder.newBuilder(tableName)
099        .setCoprocessor(SimpleRegionObserver.class.getName()).build();
100    }
101    region = mock(HRegion.class);
102    when(region.getRegionInfo()).thenReturn(regionInfo);
103    when(region.getTableDescriptor()).thenReturn(tableDesc);
104    rsServices = mock(RegionServerServices.class);
105  }
106
107  @Test
108  public void testLoadDuplicateCoprocessor() throws Exception {
109    conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
110    conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
111    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
112    // Only one coprocessor SimpleRegionObserver loaded
113    assertEquals(1, host.coprocEnvironments.size());
114
115    // Allow to load duplicate coprocessor
116    conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, false);
117    host = new RegionCoprocessorHost(region, rsServices, conf);
118    // Two duplicate coprocessors loaded
119    assertEquals(2, host.coprocEnvironments.size());
120  }
121
122  @Test
123  public void testPreStoreScannerOpen() throws IOException {
124
125    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
126    Scan scan = new Scan();
127    scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP);
128    assertTrue(scan.getTimeRange().isAllTime(), "Scan is not for all time");
129    // SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan
130    // is for all time. this lets us exercise both that the Scan is wired up properly in the coproc
131    // and that we can customize the metadata
132
133    ScanInfo oldScanInfo = getScanInfo();
134
135    HStore store = mock(HStore.class);
136    when(store.getScanInfo()).thenReturn(oldScanInfo);
137    ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan);
138
139    verifyScanInfo(newScanInfo);
140  }
141
142  @Test
143  public void testPreCompactScannerOpen() throws IOException {
144    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
145    ScanInfo oldScanInfo = getScanInfo();
146    HStore store = mock(HStore.class);
147    when(store.getScanInfo()).thenReturn(oldScanInfo);
148    ScanInfo newScanInfo = host.preCompactScannerOpen(store, ScanType.COMPACT_DROP_DELETES,
149      mock(CompactionLifeCycleTracker.class), mock(CompactionRequest.class), mock(User.class));
150    verifyScanInfo(newScanInfo);
151  }
152
153  @Test
154  public void testPreFlushScannerOpen() throws IOException {
155    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
156    ScanInfo oldScanInfo = getScanInfo();
157    HStore store = mock(HStore.class);
158    when(store.getScanInfo()).thenReturn(oldScanInfo);
159    ScanInfo newScanInfo = host.preFlushScannerOpen(store, mock(FlushLifeCycleTracker.class));
160    verifyScanInfo(newScanInfo);
161  }
162
163  @Test
164  public void testPreMemStoreCompactionCompactScannerOpen() throws IOException {
165    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
166    ScanInfo oldScanInfo = getScanInfo();
167    HStore store = mock(HStore.class);
168    when(store.getScanInfo()).thenReturn(oldScanInfo);
169    ScanInfo newScanInfo = host.preMemStoreCompactionCompactScannerOpen(store);
170    verifyScanInfo(newScanInfo);
171  }
172
173  @Test
174  public void testPostScannerFilterRow() throws IOException {
175    // By default SimpleRegionObserver is set as region coprocessor which implements
176    // postScannerFilterRow
177    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
178    assertTrue(host.hasCustomPostScannerFilterRow(),
179      "Region coprocessor implement postScannerFilterRow");
180
181    // Set a region CP which doesn't implement postScannerFilterRow
182    init(true);
183    host = new RegionCoprocessorHost(region, rsServices, conf);
184    assertFalse(host.hasCustomPostScannerFilterRow(),
185      "Region coprocessor implement postScannerFilterRow");
186
187    // Set multiple region CPs, in which one implements postScannerFilterRow
188    init(false);
189    host = new RegionCoprocessorHost(region, rsServices, conf);
190    assertTrue(host.hasCustomPostScannerFilterRow(),
191      "Region coprocessor doesn't implement postScannerFilterRow");
192  }
193
194  private void verifyScanInfo(ScanInfo newScanInfo) {
195    assertEquals(KeepDeletedCells.TRUE, newScanInfo.getKeepDeletedCells());
196    assertEquals(MAX_VERSIONS, newScanInfo.getMaxVersions());
197    assertEquals(MIN_VERSIONS, newScanInfo.getMinVersions());
198    assertEquals(TTL, newScanInfo.getTtl());
199    assertEquals(TIME_TO_PURGE_DELETES, newScanInfo.getTimeToPurgeDeletes());
200  }
201
202  private ScanInfo getScanInfo() {
203    int oldMaxVersions = 1;
204    int oldMinVersions = 0;
205    long oldTTL = 10000;
206
207    return new ScanInfo(conf, Bytes.toBytes("cf"), oldMinVersions, oldMaxVersions, oldTTL,
208      KeepDeletedCells.FALSE, HConstants.FOREVER, 1000, CellComparator.getInstance(), true);
209  }
210
211  /*
212   * Simple region coprocessor which doesn't override postScannerFilterRow
213   */
214  public static class TempRegionObserver implements RegionCoprocessor, RegionObserver {
215    @Override
216    public Optional<RegionObserver> getRegionObserver() {
217      return Optional.of(this);
218    }
219  }
220}