001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.concurrent.ConcurrentMap;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Coprocessor;
039import org.apache.hadoop.hbase.CoprocessorEnvironment;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestCase;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HTableDescriptor;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Get;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.regionserver.ChunkCreator;
052import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
053import org.apache.hadoop.hbase.regionserver.HRegion;
054import org.apache.hadoop.hbase.regionserver.InternalScanner;
055import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
056import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.RegionServerServices;
059import org.apache.hadoop.hbase.regionserver.ScanType;
060import org.apache.hadoop.hbase.regionserver.ScannerContext;
061import org.apache.hadoop.hbase.regionserver.Store;
062import org.apache.hadoop.hbase.regionserver.StoreFile;
063import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
064import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
065import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.mockito.Mockito;
073
074@Category({CoprocessorTests.class, MediumTests.class})
075public class TestCoprocessorInterface {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestCoprocessorInterface.class);
080
081  @Rule public TestName name = new TestName();
082  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083  static final Path DIR = TEST_UTIL.getDataTestDir();
084
085  private static class CustomScanner implements RegionScanner {
086
087    private RegionScanner delegate;
088
089    public CustomScanner(RegionScanner delegate) {
090      this.delegate = delegate;
091    }
092
093    @Override
094    public boolean next(List<Cell> results) throws IOException {
095      return delegate.next(results);
096    }
097
098    @Override
099    public boolean next(List<Cell> result, ScannerContext scannerContext)
100        throws IOException {
101      return delegate.next(result, scannerContext);
102    }
103
104    @Override
105    public boolean nextRaw(List<Cell> result)
106        throws IOException {
107      return delegate.nextRaw(result);
108    }
109
110    @Override
111    public boolean nextRaw(List<Cell> result, ScannerContext context)
112        throws IOException {
113      return delegate.nextRaw(result, context);
114    }
115
116    @Override
117    public void close() throws IOException {
118      delegate.close();
119    }
120
121    @Override
122    public RegionInfo getRegionInfo() {
123      return delegate.getRegionInfo();
124    }
125
126    @Override
127    public boolean isFilterDone() throws IOException {
128      return delegate.isFilterDone();
129    }
130
131    @Override
132    public boolean reseek(byte[] row) throws IOException {
133      return false;
134    }
135
136    @Override
137    public long getMaxResultSize() {
138      return delegate.getMaxResultSize();
139    }
140
141    @Override
142    public long getMvccReadPoint() {
143      return delegate.getMvccReadPoint();
144    }
145
146    @Override
147    public int getBatch() {
148      return delegate.getBatch();
149    }
150  }
151
152  public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
153
154    private boolean startCalled;
155    private boolean stopCalled;
156    private boolean preOpenCalled;
157    private boolean postOpenCalled;
158    private boolean preCloseCalled;
159    private boolean postCloseCalled;
160    private boolean preCompactCalled;
161    private boolean postCompactCalled;
162    private boolean preFlushCalled;
163    private boolean postFlushCalled;
164    private ConcurrentMap<String, Object> sharedData;
165
166    @Override
167    public void start(CoprocessorEnvironment e) {
168      sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
169      // using new String here, so that there will be new object on each invocation
170      sharedData.putIfAbsent("test1", new Object());
171      startCalled = true;
172    }
173
174    @Override
175    public void stop(CoprocessorEnvironment e) {
176      sharedData = null;
177      stopCalled = true;
178    }
179
180    @Override
181    public Optional<RegionObserver> getRegionObserver() {
182      return Optional.of(this);
183    }
184
185    @Override
186    public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
187      preOpenCalled = true;
188    }
189    @Override
190    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
191      postOpenCalled = true;
192    }
193    @Override
194    public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
195      preCloseCalled = true;
196    }
197    @Override
198    public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
199      postCloseCalled = true;
200    }
201    @Override
202    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
203        Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
204        CompactionRequest request) {
205      preCompactCalled = true;
206      return scanner;
207    }
208    @Override
209    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
210        Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
211        CompactionRequest request) {
212      postCompactCalled = true;
213    }
214
215    @Override
216    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
217        FlushLifeCycleTracker tracker) {
218      preFlushCalled = true;
219    }
220
221    @Override
222    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
223        FlushLifeCycleTracker tracker) {
224      postFlushCalled = true;
225    }
226
227    @Override
228    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
229        final Scan scan, final RegionScanner s) throws IOException {
230      return new CustomScanner(s);
231    }
232
233    boolean wasStarted() {
234      return startCalled;
235    }
236    boolean wasStopped() {
237      return stopCalled;
238    }
239    boolean wasOpened() {
240      return (preOpenCalled && postOpenCalled);
241    }
242    boolean wasClosed() {
243      return (preCloseCalled && postCloseCalled);
244    }
245    boolean wasFlushed() {
246      return (preFlushCalled && postFlushCalled);
247    }
248    boolean wasCompacted() {
249      return (preCompactCalled && postCompactCalled);
250    }
251    Map<String, Object> getSharedData() {
252      return sharedData;
253    }
254  }
255
256  public static class CoprocessorII implements RegionCoprocessor {
257    private ConcurrentMap<String, Object> sharedData;
258
259    @Override
260    public void start(CoprocessorEnvironment e) {
261      sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
262      sharedData.putIfAbsent("test2", new Object());
263    }
264
265    @Override
266    public void stop(CoprocessorEnvironment e) {
267      sharedData = null;
268    }
269
270    @Override
271    public Optional<RegionObserver> getRegionObserver() {
272      return Optional.of(new RegionObserver() {
273        @Override
274        public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
275            final Get get, final List<Cell> results) throws IOException {
276          throw new RuntimeException();
277        }
278      });
279    }
280
281    Map<String, Object> getSharedData() {
282      return sharedData;
283    }
284  }
285
286  @Test
287  public void testSharedData() throws IOException {
288    TableName tableName = TableName.valueOf(name.getMethodName());
289    byte [][] families = { fam1, fam2, fam3 };
290
291    Configuration hc = initConfig();
292    HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[]{}, families);
293
294    for (int i = 0; i < 3; i++) {
295      HBaseTestCase.addContent(region, fam3);
296      region.flush(true);
297    }
298
299    region.compact(false);
300
301    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
302
303    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
304    Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
305    Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
306    Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
307    assertNotNull(o);
308    assertNotNull(o2);
309    // to coprocessors get different sharedDatas
310    assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
311    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
312    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
313    // make sure that all coprocessor of a class have identical sharedDatas
314    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
315    assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
316
317    // now have all Environments fail
318    try {
319      byte [] r = region.getRegionInfo().getStartKey();
320      if (r == null || r.length <= 0) {
321        // Its the start row.  Can't ask for null.  Ask for minimal key instead.
322        r = new byte [] {0};
323      }
324      Get g = new Get(r);
325      region.get(g);
326      fail();
327    } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
328    }
329    assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class));
330    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
331    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
332    c = c2 = null;
333    // perform a GC
334    System.gc();
335    // reopen the region
336    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
337    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
338    // CPimpl is unaffected, still the same reference
339    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
340    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
341    // new map and object created, hence the reference is different
342    // hence the old entry was indeed removed by the GC and new one has been created
343    Object o3 = ((CoprocessorII)c2).getSharedData().get("test2");
344    assertFalse(o3 == o2);
345    HBaseTestingUtility.closeRegionAndWAL(region);
346  }
347
348  @Test
349  public void testCoprocessorInterface() throws IOException {
350    TableName tableName = TableName.valueOf(name.getMethodName());
351    byte [][] families = { fam1, fam2, fam3 };
352
353    Configuration hc = initConfig();
354    HRegion region = initHRegion(tableName, name.getMethodName(), hc,
355      new Class<?>[]{CoprocessorImpl.class}, families);
356    for (int i = 0; i < 3; i++) {
357      HBaseTestCase.addContent(region, fam3);
358      region.flush(true);
359    }
360
361    region.compact(false);
362
363    // HBASE-4197
364    Scan s = new Scan();
365    RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
366    assertTrue(scanner instanceof CustomScanner);
367    // this would throw an exception before HBASE-4197
368    scanner.next(new ArrayList<>());
369
370    HBaseTestingUtility.closeRegionAndWAL(region);
371    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
372
373    assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
374    assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
375    assertTrue(((CoprocessorImpl)c).wasOpened());
376    assertTrue(((CoprocessorImpl)c).wasClosed());
377    assertTrue(((CoprocessorImpl)c).wasFlushed());
378    assertTrue(((CoprocessorImpl)c).wasCompacted());
379  }
380
381  HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
382      throws IOException {
383    //RegionInfo info = new RegionInfo(tableName, null, null, false);
384    HRegion r = HRegion.openHRegion(closedRegion, null);
385
386    // this following piece is a hack. currently a coprocessorHost
387    // is secretly loaded at OpenRegionHandler. we don't really
388    // start a region server here, so just manually create cphost
389    // and set it to region.
390    Configuration conf = TEST_UTIL.getConfiguration();
391    RegionCoprocessorHost host = new RegionCoprocessorHost(r,
392        Mockito.mock(RegionServerServices.class), conf);
393    r.setCoprocessorHost(host);
394
395    for (Class<?> implClass : implClasses) {
396      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
397    }
398    // we need to manually call pre- and postOpen here since the
399    // above load() is not the real case for CP loading. A CP is
400    // expected to be loaded by default from 1) configuration; or 2)
401    // HTableDescriptor. If it's loaded after HRegion initialized,
402    // the pre- and postOpen() won't be triggered automatically.
403    // Here we have to call pre and postOpen explicitly.
404    host.preOpen();
405    host.postOpen();
406    return r;
407  }
408
409  HRegion initHRegion (TableName tableName, String callingMethod,
410      Configuration conf, Class<?> [] implClasses, byte [][] families)
411      throws IOException {
412    HTableDescriptor htd = new HTableDescriptor(tableName);
413    for(byte [] family : families) {
414      htd.addFamily(new HColumnDescriptor(family));
415    }
416    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
417      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
418    RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
419        .setStartKey(null)
420        .setEndKey(null)
421        .setSplit(false)
422        .build();
423    Path path = new Path(DIR + callingMethod);
424    HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
425
426    // this following piece is a hack.
427    RegionCoprocessorHost host =
428        new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
429    r.setCoprocessorHost(host);
430
431    for (Class<?> implClass : implClasses) {
432      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
433      Coprocessor c = host.findCoprocessor(implClass.getName());
434      assertNotNull(c);
435    }
436
437    // Here we have to call pre and postOpen explicitly.
438    host.preOpen();
439    host.postOpen();
440    return r;
441  }
442
443  private Configuration initConfig() {
444    // Always compact if there is more than one store file.
445    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
446    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
447    // Increase the amount of time between client retries
448    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
449    // This size should make it so we always split using the addContent
450    // below.  After adding all data, the first region is 1.3M
451    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
452        1024 * 128);
453    TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
454
455    return TEST_UTIL.getConfiguration();
456  }
457}