001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam3;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertNotNull;
025import static org.junit.jupiter.api.Assertions.assertNull;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.Set;
035import java.util.concurrent.ConcurrentMap;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.Coprocessor;
040import org.apache.hadoop.hbase.CoprocessorEnvironment;
041import org.apache.hadoop.hbase.ExtendedCell;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HTestConst;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Get;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.regionserver.ChunkCreator;
053import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.InternalScanner;
056import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
057import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
058import org.apache.hadoop.hbase.regionserver.RegionScanner;
059import org.apache.hadoop.hbase.regionserver.RegionServerServices;
060import org.apache.hadoop.hbase.regionserver.ScanType;
061import org.apache.hadoop.hbase.regionserver.ScannerContext;
062import org.apache.hadoop.hbase.regionserver.Store;
063import org.apache.hadoop.hbase.regionserver.StoreFile;
064import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
065import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
066import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.junit.jupiter.api.BeforeEach;
069import org.junit.jupiter.api.Tag;
070import org.junit.jupiter.api.Test;
071import org.junit.jupiter.api.TestInfo;
072import org.mockito.Mockito;
073
074@Tag(CoprocessorTests.TAG)
075@Tag(MediumTests.TAG)
076public class TestCoprocessorInterface {
077
078  private String currentTestName;
079  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
080  static final Path DIR = TEST_UTIL.getDataTestDir();
081
082  @BeforeEach
083  public void setUp(TestInfo testInfo) {
084    currentTestName = testInfo.getTestMethod().get().getName();
085  }
086
087  private static class CustomScanner implements RegionScanner {
088
089    private RegionScanner delegate;
090
091    public CustomScanner(RegionScanner delegate) {
092      this.delegate = delegate;
093    }
094
095    @Override
096    public boolean next(List<? super ExtendedCell> results) throws IOException {
097      return delegate.next(results);
098    }
099
100    @Override
101    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
102      throws IOException {
103      return delegate.next(result, scannerContext);
104    }
105
106    @Override
107    public boolean nextRaw(List<? super ExtendedCell> result) throws IOException {
108      return delegate.nextRaw(result);
109    }
110
111    @Override
112    public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context)
113      throws IOException {
114      return delegate.nextRaw(result, context);
115    }
116
117    @Override
118    public Set<Path> getFilesRead() {
119      return delegate.getFilesRead();
120    }
121
122    @Override
123    public void close() throws IOException {
124      delegate.close();
125    }
126
127    @Override
128    public RegionInfo getRegionInfo() {
129      return delegate.getRegionInfo();
130    }
131
132    @Override
133    public boolean isFilterDone() throws IOException {
134      return delegate.isFilterDone();
135    }
136
137    @Override
138    public boolean reseek(byte[] row) throws IOException {
139      return false;
140    }
141
142    @Override
143    public long getMaxResultSize() {
144      return delegate.getMaxResultSize();
145    }
146
147    @Override
148    public long getMvccReadPoint() {
149      return delegate.getMvccReadPoint();
150    }
151
152    @Override
153    public int getBatch() {
154      return delegate.getBatch();
155    }
156  }
157
158  public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
159
160    private boolean startCalled;
161    private boolean stopCalled;
162    private boolean preOpenCalled;
163    private boolean postOpenCalled;
164    private boolean preCloseCalled;
165    private boolean postCloseCalled;
166    private boolean preCompactCalled;
167    private boolean postCompactCalled;
168    private boolean preFlushCalled;
169    private boolean postFlushCalled;
170    private ConcurrentMap<String, Object> sharedData;
171
172    @Override
173    public void start(CoprocessorEnvironment e) {
174      sharedData = ((RegionCoprocessorEnvironment) e).getSharedData();
175      // using new String here, so that there will be new object on each invocation
176      sharedData.putIfAbsent("test1", new Object());
177      startCalled = true;
178    }
179
180    @Override
181    public void stop(CoprocessorEnvironment e) {
182      sharedData = null;
183      stopCalled = true;
184    }
185
186    @Override
187    public Optional<RegionObserver> getRegionObserver() {
188      return Optional.of(this);
189    }
190
191    @Override
192    public void preOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e) {
193      preOpenCalled = true;
194    }
195
196    @Override
197    public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e) {
198      postOpenCalled = true;
199    }
200
201    @Override
202    public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> e,
203      boolean abortRequested) {
204      preCloseCalled = true;
205    }
206
207    @Override
208    public void postClose(ObserverContext<? extends RegionCoprocessorEnvironment> e,
209      boolean abortRequested) {
210      postCloseCalled = true;
211    }
212
213    @Override
214    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e,
215      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
216      CompactionRequest request) {
217      preCompactCalled = true;
218      return scanner;
219    }
220
221    @Override
222    public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, Store store,
223      StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
224      postCompactCalled = true;
225    }
226
227    @Override
228    public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e,
229      FlushLifeCycleTracker tracker) {
230      preFlushCalled = true;
231    }
232
233    @Override
234    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e,
235      FlushLifeCycleTracker tracker) {
236      postFlushCalled = true;
237    }
238
239    @Override
240    public RegionScanner postScannerOpen(
241      final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan,
242      final RegionScanner s) throws IOException {
243      return new CustomScanner(s);
244    }
245
246    boolean wasStarted() {
247      return startCalled;
248    }
249
250    boolean wasStopped() {
251      return stopCalled;
252    }
253
254    boolean wasOpened() {
255      return (preOpenCalled && postOpenCalled);
256    }
257
258    boolean wasClosed() {
259      return (preCloseCalled && postCloseCalled);
260    }
261
262    boolean wasFlushed() {
263      return (preFlushCalled && postFlushCalled);
264    }
265
266    boolean wasCompacted() {
267      return (preCompactCalled && postCompactCalled);
268    }
269
270    Map<String, Object> getSharedData() {
271      return sharedData;
272    }
273  }
274
275  public static class CoprocessorII implements RegionCoprocessor {
276    private ConcurrentMap<String, Object> sharedData;
277
278    @Override
279    public void start(CoprocessorEnvironment e) {
280      sharedData = ((RegionCoprocessorEnvironment) e).getSharedData();
281      sharedData.putIfAbsent("test2", new Object());
282    }
283
284    @Override
285    public void stop(CoprocessorEnvironment e) {
286      sharedData = null;
287    }
288
289    @Override
290    public Optional<RegionObserver> getRegionObserver() {
291      return Optional.of(new RegionObserver() {
292        @Override
293        public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
294          final Get get, final List<Cell> results) throws IOException {
295          throw new RuntimeException();
296        }
297      });
298    }
299
300    Map<String, Object> getSharedData() {
301      return sharedData;
302    }
303  }
304
305  @Test
306  public void testSharedData() throws IOException {
307    TableName tableName = TableName.valueOf(currentTestName);
308    byte[][] families = { fam1, fam2, fam3 };
309
310    Configuration hc = initConfig();
311    HRegion region = initHRegion(tableName, currentTestName, hc, new Class<?>[] {}, families);
312
313    for (int i = 0; i < 3; i++) {
314      HTestConst.addContent(region, fam3);
315      region.flush(true);
316    }
317
318    region.compact(false);
319
320    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
321
322    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
323    Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
324    Object o = ((CoprocessorImpl) c).getSharedData().get("test1");
325    Object o2 = ((CoprocessorII) c2).getSharedData().get("test2");
326    assertNotNull(o);
327    assertNotNull(o2);
328    // to coprocessors get different sharedDatas
329    assertFalse(((CoprocessorImpl) c).getSharedData() == ((CoprocessorII) c2).getSharedData());
330    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
331    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
332    // make sure that all coprocessor of a class have identical sharedDatas
333    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
334    assertTrue(((CoprocessorII) c2).getSharedData().get("test2") == o2);
335
336    // now have all Environments fail
337    try {
338      byte[] r = region.getRegionInfo().getStartKey();
339      if (r == null || r.length <= 0) {
340        // Its the start row. Can't ask for null. Ask for minimal key instead.
341        r = new byte[] { 0 };
342      }
343      Get g = new Get(r);
344      region.get(g);
345      fail();
346    } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
347    }
348    assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class));
349    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
350    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
351    c = c2 = null;
352    // perform a GC
353    System.gc();
354    // reopen the region
355    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
356    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
357    // CPimpl is unaffected, still the same reference
358    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
359    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
360    // new map and object created, hence the reference is different
361    // hence the old entry was indeed removed by the GC and new one has been created
362    Object o3 = ((CoprocessorII) c2).getSharedData().get("test2");
363    assertFalse(o3 == o2);
364    HBaseTestingUtil.closeRegionAndWAL(region);
365  }
366
367  @Test
368  public void testCoprocessorInterface() throws IOException {
369    TableName tableName = TableName.valueOf(currentTestName);
370    byte[][] families = { fam1, fam2, fam3 };
371
372    Configuration hc = initConfig();
373    HRegion region = initHRegion(tableName, currentTestName, hc,
374      new Class<?>[] { CoprocessorImpl.class }, families);
375    for (int i = 0; i < 3; i++) {
376      HTestConst.addContent(region, fam3);
377      region.flush(true);
378    }
379
380    region.compact(false);
381
382    // HBASE-4197
383    Scan s = new Scan();
384    RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
385    assertTrue(scanner instanceof CustomScanner);
386    // this would throw an exception before HBASE-4197
387    scanner.next(new ArrayList<>());
388
389    HBaseTestingUtil.closeRegionAndWAL(region);
390    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
391
392    assertTrue(((CoprocessorImpl) c).wasStarted(), "Coprocessor not started");
393    assertTrue(((CoprocessorImpl) c).wasStopped(), "Coprocessor not stopped");
394    assertTrue(((CoprocessorImpl) c).wasOpened());
395    assertTrue(((CoprocessorImpl) c).wasClosed());
396    assertTrue(((CoprocessorImpl) c).wasFlushed());
397    assertTrue(((CoprocessorImpl) c).wasCompacted());
398  }
399
400  HRegion reopenRegion(final HRegion closedRegion, Class<?>... implClasses) throws IOException {
401    // RegionInfo info = new RegionInfo(tableName, null, null, false);
402    HRegion r = HRegion.openHRegion(closedRegion, null);
403
404    // this following piece is a hack. currently a coprocessorHost
405    // is secretly loaded at OpenRegionHandler. we don't really
406    // start a region server here, so just manually create cphost
407    // and set it to region.
408    Configuration conf = TEST_UTIL.getConfiguration();
409    RegionCoprocessorHost host =
410      new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
411    r.setCoprocessorHost(host);
412
413    for (Class<?> implClass : implClasses) {
414      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
415    }
416    // we need to manually call pre- and postOpen here since the
417    // above load() is not the real case for CP loading. A CP is
418    // expected to be loaded by default from 1) configuration; or 2)
419    // HTableDescriptor. If it's loaded after HRegion initialized,
420    // the pre- and postOpen() won't be triggered automatically.
421    // Here we have to call pre and postOpen explicitly.
422    host.preOpen();
423    host.postOpen();
424    return r;
425  }
426
427  HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
428    Class<?>[] implClasses, byte[][] families) throws IOException {
429    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
430    for (byte[] family : families) {
431      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
432    }
433    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
434      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
435    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).setStartKey(null).setEndKey(null)
436      .setSplit(false).build();
437    Path path = new Path(DIR + callingMethod);
438    HRegion r = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build());
439
440    // this following piece is a hack.
441    RegionCoprocessorHost host =
442      new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
443    r.setCoprocessorHost(host);
444
445    for (Class<?> implClass : implClasses) {
446      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
447      Coprocessor c = host.findCoprocessor(implClass.getName());
448      assertNotNull(c);
449    }
450
451    // Here we have to call pre and postOpen explicitly.
452    host.preOpen();
453    host.postOpen();
454    return r;
455  }
456
457  private Configuration initConfig() {
458    // Always compact if there is more than one store file.
459    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
460    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
461    // Increase the amount of time between client retries
462    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
463    // This size should make it so we always split using the addContent
464    // below. After adding all data, the first region is 1.3M
465    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
466    TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
467
468    return TEST_UTIL.getConfiguration();
469  }
470}