001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.concurrent.ConcurrentMap;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Coprocessor;
039import org.apache.hadoop.hbase.CoprocessorEnvironment;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HColumnDescriptor;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HTableDescriptor;
045import org.apache.hadoop.hbase.HTestConst;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Get;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.regionserver.ChunkCreator;
052import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
053import org.apache.hadoop.hbase.regionserver.HRegion;
054import org.apache.hadoop.hbase.regionserver.InternalScanner;
055import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
056import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.RegionServerServices;
059import org.apache.hadoop.hbase.regionserver.ScanType;
060import org.apache.hadoop.hbase.regionserver.ScannerContext;
061import org.apache.hadoop.hbase.regionserver.Store;
062import org.apache.hadoop.hbase.regionserver.StoreFile;
063import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
064import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
065import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.mockito.Mockito;
073
074@Category({ CoprocessorTests.class, MediumTests.class })
075public class TestCoprocessorInterface {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestCoprocessorInterface.class);
080
081  @Rule
082  public TestName name = new TestName();
083  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084  static final Path DIR = TEST_UTIL.getDataTestDir();
085
086  private static class CustomScanner implements RegionScanner {
087
088    private RegionScanner delegate;
089
090    public CustomScanner(RegionScanner delegate) {
091      this.delegate = delegate;
092    }
093
094    @Override
095    public boolean next(List<Cell> results) throws IOException {
096      return delegate.next(results);
097    }
098
099    @Override
100    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
101      return delegate.next(result, scannerContext);
102    }
103
104    @Override
105    public boolean nextRaw(List<Cell> result) throws IOException {
106      return delegate.nextRaw(result);
107    }
108
109    @Override
110    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
111      return delegate.nextRaw(result, context);
112    }
113
114    @Override
115    public void close() throws IOException {
116      delegate.close();
117    }
118
119    @Override
120    public RegionInfo getRegionInfo() {
121      return delegate.getRegionInfo();
122    }
123
124    @Override
125    public boolean isFilterDone() throws IOException {
126      return delegate.isFilterDone();
127    }
128
129    @Override
130    public boolean reseek(byte[] row) throws IOException {
131      return false;
132    }
133
134    @Override
135    public long getMaxResultSize() {
136      return delegate.getMaxResultSize();
137    }
138
139    @Override
140    public long getMvccReadPoint() {
141      return delegate.getMvccReadPoint();
142    }
143
144    @Override
145    public int getBatch() {
146      return delegate.getBatch();
147    }
148  }
149
150  public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
151
152    private boolean startCalled;
153    private boolean stopCalled;
154    private boolean preOpenCalled;
155    private boolean postOpenCalled;
156    private boolean preCloseCalled;
157    private boolean postCloseCalled;
158    private boolean preCompactCalled;
159    private boolean postCompactCalled;
160    private boolean preFlushCalled;
161    private boolean postFlushCalled;
162    private ConcurrentMap<String, Object> sharedData;
163
164    @Override
165    public void start(CoprocessorEnvironment e) {
166      sharedData = ((RegionCoprocessorEnvironment) e).getSharedData();
167      // using new String here, so that there will be new object on each invocation
168      sharedData.putIfAbsent("test1", new Object());
169      startCalled = true;
170    }
171
172    @Override
173    public void stop(CoprocessorEnvironment e) {
174      sharedData = null;
175      stopCalled = true;
176    }
177
178    @Override
179    public Optional<RegionObserver> getRegionObserver() {
180      return Optional.of(this);
181    }
182
183    @Override
184    public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
185      preOpenCalled = true;
186    }
187
188    @Override
189    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
190      postOpenCalled = true;
191    }
192
193    @Override
194    public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
195      preCloseCalled = true;
196    }
197
198    @Override
199    public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
200      postCloseCalled = true;
201    }
202
203    @Override
204    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
205      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
206      CompactionRequest request) {
207      preCompactCalled = true;
208      return scanner;
209    }
210
211    @Override
212    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
213      StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
214      postCompactCalled = true;
215    }
216
217    @Override
218    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
219      FlushLifeCycleTracker tracker) {
220      preFlushCalled = true;
221    }
222
223    @Override
224    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
225      FlushLifeCycleTracker tracker) {
226      postFlushCalled = true;
227    }
228
229    @Override
230    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
231      final Scan scan, final RegionScanner s) throws IOException {
232      return new CustomScanner(s);
233    }
234
235    boolean wasStarted() {
236      return startCalled;
237    }
238
239    boolean wasStopped() {
240      return stopCalled;
241    }
242
243    boolean wasOpened() {
244      return (preOpenCalled && postOpenCalled);
245    }
246
247    boolean wasClosed() {
248      return (preCloseCalled && postCloseCalled);
249    }
250
251    boolean wasFlushed() {
252      return (preFlushCalled && postFlushCalled);
253    }
254
255    boolean wasCompacted() {
256      return (preCompactCalled && postCompactCalled);
257    }
258
259    Map<String, Object> getSharedData() {
260      return sharedData;
261    }
262  }
263
264  public static class CoprocessorII implements RegionCoprocessor {
265    private ConcurrentMap<String, Object> sharedData;
266
267    @Override
268    public void start(CoprocessorEnvironment e) {
269      sharedData = ((RegionCoprocessorEnvironment) e).getSharedData();
270      sharedData.putIfAbsent("test2", new Object());
271    }
272
273    @Override
274    public void stop(CoprocessorEnvironment e) {
275      sharedData = null;
276    }
277
278    @Override
279    public Optional<RegionObserver> getRegionObserver() {
280      return Optional.of(new RegionObserver() {
281        @Override
282        public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
283          final List<Cell> results) throws IOException {
284          throw new RuntimeException();
285        }
286      });
287    }
288
289    Map<String, Object> getSharedData() {
290      return sharedData;
291    }
292  }
293
294  @Test
295  public void testSharedData() throws IOException {
296    TableName tableName = TableName.valueOf(name.getMethodName());
297    byte[][] families = { fam1, fam2, fam3 };
298
299    Configuration hc = initConfig();
300    HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[] {}, families);
301
302    for (int i = 0; i < 3; i++) {
303      HTestConst.addContent(region, fam3);
304      region.flush(true);
305    }
306
307    region.compact(false);
308
309    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
310
311    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
312    Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
313    Object o = ((CoprocessorImpl) c).getSharedData().get("test1");
314    Object o2 = ((CoprocessorII) c2).getSharedData().get("test2");
315    assertNotNull(o);
316    assertNotNull(o2);
317    // to coprocessors get different sharedDatas
318    assertFalse(((CoprocessorImpl) c).getSharedData() == ((CoprocessorII) c2).getSharedData());
319    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
320    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
321    // make sure that all coprocessor of a class have identical sharedDatas
322    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
323    assertTrue(((CoprocessorII) c2).getSharedData().get("test2") == o2);
324
325    // now have all Environments fail
326    try {
327      byte[] r = region.getRegionInfo().getStartKey();
328      if (r == null || r.length <= 0) {
329        // Its the start row. Can't ask for null. Ask for minimal key instead.
330        r = new byte[] { 0 };
331      }
332      Get g = new Get(r);
333      region.get(g);
334      fail();
335    } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
336    }
337    assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class));
338    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
339    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
340    c = c2 = null;
341    // perform a GC
342    System.gc();
343    // reopen the region
344    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
345    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
346    // CPimpl is unaffected, still the same reference
347    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
348    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
349    // new map and object created, hence the reference is different
350    // hence the old entry was indeed removed by the GC and new one has been created
351    Object o3 = ((CoprocessorII) c2).getSharedData().get("test2");
352    assertFalse(o3 == o2);
353    HBaseTestingUtility.closeRegionAndWAL(region);
354  }
355
356  @Test
357  public void testCoprocessorInterface() throws IOException {
358    TableName tableName = TableName.valueOf(name.getMethodName());
359    byte[][] families = { fam1, fam2, fam3 };
360
361    Configuration hc = initConfig();
362    HRegion region = initHRegion(tableName, name.getMethodName(), hc,
363      new Class<?>[] { CoprocessorImpl.class }, families);
364    for (int i = 0; i < 3; i++) {
365      HTestConst.addContent(region, fam3);
366      region.flush(true);
367    }
368
369    region.compact(false);
370
371    // HBASE-4197
372    Scan s = new Scan();
373    RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
374    assertTrue(scanner instanceof CustomScanner);
375    // this would throw an exception before HBASE-4197
376    scanner.next(new ArrayList<>());
377
378    HBaseTestingUtility.closeRegionAndWAL(region);
379    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
380
381    assertTrue("Coprocessor not started", ((CoprocessorImpl) c).wasStarted());
382    assertTrue("Coprocessor not stopped", ((CoprocessorImpl) c).wasStopped());
383    assertTrue(((CoprocessorImpl) c).wasOpened());
384    assertTrue(((CoprocessorImpl) c).wasClosed());
385    assertTrue(((CoprocessorImpl) c).wasFlushed());
386    assertTrue(((CoprocessorImpl) c).wasCompacted());
387  }
388
389  HRegion reopenRegion(final HRegion closedRegion, Class<?>... implClasses) throws IOException {
390    // RegionInfo info = new RegionInfo(tableName, null, null, false);
391    HRegion r = HRegion.openHRegion(closedRegion, null);
392
393    // this following piece is a hack. currently a coprocessorHost
394    // is secretly loaded at OpenRegionHandler. we don't really
395    // start a region server here, so just manually create cphost
396    // and set it to region.
397    Configuration conf = TEST_UTIL.getConfiguration();
398    RegionCoprocessorHost host =
399      new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
400    r.setCoprocessorHost(host);
401
402    for (Class<?> implClass : implClasses) {
403      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
404    }
405    // we need to manually call pre- and postOpen here since the
406    // above load() is not the real case for CP loading. A CP is
407    // expected to be loaded by default from 1) configuration; or 2)
408    // HTableDescriptor. If it's loaded after HRegion initialized,
409    // the pre- and postOpen() won't be triggered automatically.
410    // Here we have to call pre and postOpen explicitly.
411    host.preOpen();
412    host.postOpen();
413    return r;
414  }
415
416  HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
417    Class<?>[] implClasses, byte[][] families) throws IOException {
418    HTableDescriptor htd = new HTableDescriptor(tableName);
419    for (byte[] family : families) {
420      htd.addFamily(new HColumnDescriptor(family));
421    }
422    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
423      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
424    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).setStartKey(null).setEndKey(null)
425      .setSplit(false).build();
426    Path path = new Path(DIR + callingMethod);
427    HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
428
429    // this following piece is a hack.
430    RegionCoprocessorHost host =
431      new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
432    r.setCoprocessorHost(host);
433
434    for (Class<?> implClass : implClasses) {
435      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
436      Coprocessor c = host.findCoprocessor(implClass.getName());
437      assertNotNull(c);
438    }
439
440    // Here we have to call pre and postOpen explicitly.
441    host.preOpen();
442    host.postOpen();
443    return r;
444  }
445
446  private Configuration initConfig() {
447    // Always compact if there is more than one store file.
448    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
449    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
450    // Increase the amount of time between client retries
451    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
452    // This size should make it so we always split using the addContent
453    // below. After adding all data, the first region is 1.3M
454    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
455    TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
456
457    return TEST_UTIL.getConfiguration();
458  }
459}