001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.concurrent.ConcurrentMap;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Coprocessor;
039import org.apache.hadoop.hbase.CoprocessorEnvironment;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestCase;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HTableDescriptor;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Get;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.regionserver.ChunkCreator;
052import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
053import org.apache.hadoop.hbase.regionserver.HRegion;
054import org.apache.hadoop.hbase.regionserver.InternalScanner;
055import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
056import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.RegionServerServices;
059import org.apache.hadoop.hbase.regionserver.ScanType;
060import org.apache.hadoop.hbase.regionserver.ScannerContext;
061import org.apache.hadoop.hbase.regionserver.Store;
062import org.apache.hadoop.hbase.regionserver.StoreFile;
063import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
064import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
065import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.mockito.Mockito;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076@Category({CoprocessorTests.class, MediumTests.class})
077public class TestCoprocessorInterface {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestCoprocessorInterface.class);
082
083  @Rule public TestName name = new TestName();
084  private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorInterface.class);
085  private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
086  static final Path DIR = TEST_UTIL.getDataTestDir();
087
088  private static class CustomScanner implements RegionScanner {
089
090    private RegionScanner delegate;
091
092    public CustomScanner(RegionScanner delegate) {
093      this.delegate = delegate;
094    }
095
096    @Override
097    public boolean next(List<Cell> results) throws IOException {
098      return delegate.next(results);
099    }
100
101    @Override
102    public boolean next(List<Cell> result, ScannerContext scannerContext)
103        throws IOException {
104      return delegate.next(result, scannerContext);
105    }
106
107    @Override
108    public boolean nextRaw(List<Cell> result)
109        throws IOException {
110      return delegate.nextRaw(result);
111    }
112
113    @Override
114    public boolean nextRaw(List<Cell> result, ScannerContext context)
115        throws IOException {
116      return delegate.nextRaw(result, context);
117    }
118
119    @Override
120    public void close() throws IOException {
121      delegate.close();
122    }
123
124    @Override
125    public RegionInfo getRegionInfo() {
126      return delegate.getRegionInfo();
127    }
128
129    @Override
130    public boolean isFilterDone() throws IOException {
131      return delegate.isFilterDone();
132    }
133
134    @Override
135    public boolean reseek(byte[] row) throws IOException {
136      return false;
137    }
138
139    @Override
140    public long getMaxResultSize() {
141      return delegate.getMaxResultSize();
142    }
143
144    @Override
145    public long getMvccReadPoint() {
146      return delegate.getMvccReadPoint();
147    }
148
149    @Override
150    public int getBatch() {
151      return delegate.getBatch();
152    }
153  }
154
155  public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
156
157    private boolean startCalled;
158    private boolean stopCalled;
159    private boolean preOpenCalled;
160    private boolean postOpenCalled;
161    private boolean preCloseCalled;
162    private boolean postCloseCalled;
163    private boolean preCompactCalled;
164    private boolean postCompactCalled;
165    private boolean preFlushCalled;
166    private boolean postFlushCalled;
167    private ConcurrentMap<String, Object> sharedData;
168
169    @Override
170    public void start(CoprocessorEnvironment e) {
171      sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
172      // using new String here, so that there will be new object on each invocation
173      sharedData.putIfAbsent("test1", new Object());
174      startCalled = true;
175    }
176
177    @Override
178    public void stop(CoprocessorEnvironment e) {
179      sharedData = null;
180      stopCalled = true;
181    }
182
183    @Override
184    public Optional<RegionObserver> getRegionObserver() {
185      return Optional.of(this);
186    }
187
188    @Override
189    public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
190      preOpenCalled = true;
191    }
192    @Override
193    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
194      postOpenCalled = true;
195    }
196    @Override
197    public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
198      preCloseCalled = true;
199    }
200    @Override
201    public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
202      postCloseCalled = true;
203    }
204    @Override
205    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
206        Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
207        CompactionRequest request) {
208      preCompactCalled = true;
209      return scanner;
210    }
211    @Override
212    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
213        Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
214        CompactionRequest request) {
215      postCompactCalled = true;
216    }
217
218    @Override
219    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
220        FlushLifeCycleTracker tracker) {
221      preFlushCalled = true;
222    }
223
224    @Override
225    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
226        FlushLifeCycleTracker tracker) {
227      postFlushCalled = true;
228    }
229
230    @Override
231    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
232        final Scan scan, final RegionScanner s) throws IOException {
233      return new CustomScanner(s);
234    }
235
236    boolean wasStarted() {
237      return startCalled;
238    }
239    boolean wasStopped() {
240      return stopCalled;
241    }
242    boolean wasOpened() {
243      return (preOpenCalled && postOpenCalled);
244    }
245    boolean wasClosed() {
246      return (preCloseCalled && postCloseCalled);
247    }
248    boolean wasFlushed() {
249      return (preFlushCalled && postFlushCalled);
250    }
251    boolean wasCompacted() {
252      return (preCompactCalled && postCompactCalled);
253    }
254    Map<String, Object> getSharedData() {
255      return sharedData;
256    }
257  }
258
259  public static class CoprocessorII implements RegionCoprocessor {
260    private ConcurrentMap<String, Object> sharedData;
261
262    @Override
263    public void start(CoprocessorEnvironment e) {
264      sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
265      sharedData.putIfAbsent("test2", new Object());
266    }
267
268    @Override
269    public void stop(CoprocessorEnvironment e) {
270      sharedData = null;
271    }
272
273    @Override
274    public Optional<RegionObserver> getRegionObserver() {
275      return Optional.of(new RegionObserver() {
276        @Override
277        public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
278            final Get get, final List<Cell> results) throws IOException {
279          throw new RuntimeException();
280        }
281      });
282    }
283
284    Map<String, Object> getSharedData() {
285      return sharedData;
286    }
287  }
288
289  @Test
290  public void testSharedData() throws IOException {
291    TableName tableName = TableName.valueOf(name.getMethodName());
292    byte [][] families = { fam1, fam2, fam3 };
293
294    Configuration hc = initConfig();
295    HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[]{}, families);
296
297    for (int i = 0; i < 3; i++) {
298      HBaseTestCase.addContent(region, fam3);
299      region.flush(true);
300    }
301
302    region.compact(false);
303
304    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
305
306    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
307    Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
308    Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
309    Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
310    assertNotNull(o);
311    assertNotNull(o2);
312    // to coprocessors get different sharedDatas
313    assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
314    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
315    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
316    // make sure that all coprocessor of a class have identical sharedDatas
317    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
318    assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
319
320    // now have all Environments fail
321    try {
322      byte [] r = region.getRegionInfo().getStartKey();
323      if (r == null || r.length <= 0) {
324        // Its the start row.  Can't ask for null.  Ask for minimal key instead.
325        r = new byte [] {0};
326      }
327      Get g = new Get(r);
328      region.get(g);
329      fail();
330    } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
331    }
332    assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class));
333    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
334    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
335    c = c2 = null;
336    // perform a GC
337    System.gc();
338    // reopen the region
339    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
340    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
341    // CPimpl is unaffected, still the same reference
342    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
343    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
344    // new map and object created, hence the reference is different
345    // hence the old entry was indeed removed by the GC and new one has been created
346    Object o3 = ((CoprocessorII)c2).getSharedData().get("test2");
347    assertFalse(o3 == o2);
348    HBaseTestingUtility.closeRegionAndWAL(region);
349  }
350
351  @Test
352  public void testCoprocessorInterface() throws IOException {
353    TableName tableName = TableName.valueOf(name.getMethodName());
354    byte [][] families = { fam1, fam2, fam3 };
355
356    Configuration hc = initConfig();
357    HRegion region = initHRegion(tableName, name.getMethodName(), hc,
358      new Class<?>[]{CoprocessorImpl.class}, families);
359    for (int i = 0; i < 3; i++) {
360      HBaseTestCase.addContent(region, fam3);
361      region.flush(true);
362    }
363
364    region.compact(false);
365
366    // HBASE-4197
367    Scan s = new Scan();
368    RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
369    assertTrue(scanner instanceof CustomScanner);
370    // this would throw an exception before HBASE-4197
371    scanner.next(new ArrayList<>());
372
373    HBaseTestingUtility.closeRegionAndWAL(region);
374    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
375
376    assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
377    assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
378    assertTrue(((CoprocessorImpl)c).wasOpened());
379    assertTrue(((CoprocessorImpl)c).wasClosed());
380    assertTrue(((CoprocessorImpl)c).wasFlushed());
381    assertTrue(((CoprocessorImpl)c).wasCompacted());
382  }
383
384  HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
385      throws IOException {
386    //RegionInfo info = new RegionInfo(tableName, null, null, false);
387    HRegion r = HRegion.openHRegion(closedRegion, null);
388
389    // this following piece is a hack. currently a coprocessorHost
390    // is secretly loaded at OpenRegionHandler. we don't really
391    // start a region server here, so just manually create cphost
392    // and set it to region.
393    Configuration conf = TEST_UTIL.getConfiguration();
394    RegionCoprocessorHost host = new RegionCoprocessorHost(r,
395        Mockito.mock(RegionServerServices.class), conf);
396    r.setCoprocessorHost(host);
397
398    for (Class<?> implClass : implClasses) {
399      host.load((Class<? extends RegionCoprocessor>) implClass, Coprocessor.PRIORITY_USER, conf);
400    }
401    // we need to manually call pre- and postOpen here since the
402    // above load() is not the real case for CP loading. A CP is
403    // expected to be loaded by default from 1) configuration; or 2)
404    // HTableDescriptor. If it's loaded after HRegion initialized,
405    // the pre- and postOpen() won't be triggered automatically.
406    // Here we have to call pre and postOpen explicitly.
407    host.preOpen();
408    host.postOpen();
409    return r;
410  }
411
412  HRegion initHRegion (TableName tableName, String callingMethod,
413      Configuration conf, Class<?> [] implClasses, byte [][] families)
414      throws IOException {
415    HTableDescriptor htd = new HTableDescriptor(tableName);
416    for(byte [] family : families) {
417      htd.addFamily(new HColumnDescriptor(family));
418    }
419    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
420    RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
421        .setStartKey(null)
422        .setEndKey(null)
423        .setSplit(false)
424        .build();
425    Path path = new Path(DIR + callingMethod);
426    HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
427
428    // this following piece is a hack.
429    RegionCoprocessorHost host =
430        new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
431    r.setCoprocessorHost(host);
432
433    for (Class<?> implClass : implClasses) {
434      host.load((Class<? extends RegionCoprocessor>) implClass, Coprocessor.PRIORITY_USER, conf);
435      Coprocessor c = host.findCoprocessor(implClass.getName());
436      assertNotNull(c);
437    }
438
439    // Here we have to call pre and postOpen explicitly.
440    host.preOpen();
441    host.postOpen();
442    return r;
443  }
444
445  private Configuration initConfig() {
446    // Always compact if there is more than one store file.
447    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
448    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
449    // Increase the amount of time between client retries
450    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
451    // This size should make it so we always split using the addContent
452    // below.  After adding all data, the first region is 1.3M
453    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
454        1024 * 128);
455    TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
456        true);
457    TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
458
459    return TEST_UTIL.getConfiguration();
460  }
461}