001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam3;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.Set;
035import java.util.concurrent.ConcurrentMap;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.Coprocessor;
040import org.apache.hadoop.hbase.CoprocessorEnvironment;
041import org.apache.hadoop.hbase.ExtendedCell;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HTestConst;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Get;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.regionserver.ChunkCreator;
054import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
058import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
059import org.apache.hadoop.hbase.regionserver.RegionScanner;
060import org.apache.hadoop.hbase.regionserver.RegionServerServices;
061import org.apache.hadoop.hbase.regionserver.ScanType;
062import org.apache.hadoop.hbase.regionserver.ScannerContext;
063import org.apache.hadoop.hbase.regionserver.Store;
064import org.apache.hadoop.hbase.regionserver.StoreFile;
065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
067import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.junit.ClassRule;
070import org.junit.Rule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.rules.TestName;
074import org.mockito.Mockito;
075
076@Category({ CoprocessorTests.class, MediumTests.class })
077public class TestCoprocessorInterface {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestCoprocessorInterface.class);
082
083  @Rule
084  public TestName name = new TestName();
085  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
086  static final Path DIR = TEST_UTIL.getDataTestDir();
087
088  private static class CustomScanner implements RegionScanner {
089
090    private RegionScanner delegate;
091
092    public CustomScanner(RegionScanner delegate) {
093      this.delegate = delegate;
094    }
095
096    @Override
097    public boolean next(List<? super ExtendedCell> results) throws IOException {
098      return delegate.next(results);
099    }
100
101    @Override
102    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
103      throws IOException {
104      return delegate.next(result, scannerContext);
105    }
106
107    @Override
108    public boolean nextRaw(List<? super ExtendedCell> result) throws IOException {
109      return delegate.nextRaw(result);
110    }
111
112    @Override
113    public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context)
114      throws IOException {
115      return delegate.nextRaw(result, context);
116    }
117
118    @Override
119    public Set<Path> getFilesRead() {
120      return delegate.getFilesRead();
121    }
122
123    @Override
124    public void close() throws IOException {
125      delegate.close();
126    }
127
128    @Override
129    public RegionInfo getRegionInfo() {
130      return delegate.getRegionInfo();
131    }
132
133    @Override
134    public boolean isFilterDone() throws IOException {
135      return delegate.isFilterDone();
136    }
137
138    @Override
139    public boolean reseek(byte[] row) throws IOException {
140      return false;
141    }
142
143    @Override
144    public long getMaxResultSize() {
145      return delegate.getMaxResultSize();
146    }
147
148    @Override
149    public long getMvccReadPoint() {
150      return delegate.getMvccReadPoint();
151    }
152
153    @Override
154    public int getBatch() {
155      return delegate.getBatch();
156    }
157  }
158
159  public static class CoprocessorImpl implements RegionCoprocessor, RegionObserver {
160
161    private boolean startCalled;
162    private boolean stopCalled;
163    private boolean preOpenCalled;
164    private boolean postOpenCalled;
165    private boolean preCloseCalled;
166    private boolean postCloseCalled;
167    private boolean preCompactCalled;
168    private boolean postCompactCalled;
169    private boolean preFlushCalled;
170    private boolean postFlushCalled;
171    private ConcurrentMap<String, Object> sharedData;
172
173    @Override
174    public void start(CoprocessorEnvironment e) {
175      sharedData = ((RegionCoprocessorEnvironment) e).getSharedData();
176      // using new String here, so that there will be new object on each invocation
177      sharedData.putIfAbsent("test1", new Object());
178      startCalled = true;
179    }
180
181    @Override
182    public void stop(CoprocessorEnvironment e) {
183      sharedData = null;
184      stopCalled = true;
185    }
186
187    @Override
188    public Optional<RegionObserver> getRegionObserver() {
189      return Optional.of(this);
190    }
191
192    @Override
193    public void preOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e) {
194      preOpenCalled = true;
195    }
196
197    @Override
198    public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e) {
199      postOpenCalled = true;
200    }
201
202    @Override
203    public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> e,
204      boolean abortRequested) {
205      preCloseCalled = true;
206    }
207
208    @Override
209    public void postClose(ObserverContext<? extends RegionCoprocessorEnvironment> e,
210      boolean abortRequested) {
211      postCloseCalled = true;
212    }
213
214    @Override
215    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e,
216      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
217      CompactionRequest request) {
218      preCompactCalled = true;
219      return scanner;
220    }
221
222    @Override
223    public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> e, Store store,
224      StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
225      postCompactCalled = true;
226    }
227
228    @Override
229    public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e,
230      FlushLifeCycleTracker tracker) {
231      preFlushCalled = true;
232    }
233
234    @Override
235    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> e,
236      FlushLifeCycleTracker tracker) {
237      postFlushCalled = true;
238    }
239
240    @Override
241    public RegionScanner postScannerOpen(
242      final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan,
243      final RegionScanner s) throws IOException {
244      return new CustomScanner(s);
245    }
246
247    boolean wasStarted() {
248      return startCalled;
249    }
250
251    boolean wasStopped() {
252      return stopCalled;
253    }
254
255    boolean wasOpened() {
256      return (preOpenCalled && postOpenCalled);
257    }
258
259    boolean wasClosed() {
260      return (preCloseCalled && postCloseCalled);
261    }
262
263    boolean wasFlushed() {
264      return (preFlushCalled && postFlushCalled);
265    }
266
267    boolean wasCompacted() {
268      return (preCompactCalled && postCompactCalled);
269    }
270
271    Map<String, Object> getSharedData() {
272      return sharedData;
273    }
274  }
275
276  public static class CoprocessorII implements RegionCoprocessor {
277    private ConcurrentMap<String, Object> sharedData;
278
279    @Override
280    public void start(CoprocessorEnvironment e) {
281      sharedData = ((RegionCoprocessorEnvironment) e).getSharedData();
282      sharedData.putIfAbsent("test2", new Object());
283    }
284
285    @Override
286    public void stop(CoprocessorEnvironment e) {
287      sharedData = null;
288    }
289
290    @Override
291    public Optional<RegionObserver> getRegionObserver() {
292      return Optional.of(new RegionObserver() {
293        @Override
294        public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
295          final Get get, final List<Cell> results) throws IOException {
296          throw new RuntimeException();
297        }
298      });
299    }
300
301    Map<String, Object> getSharedData() {
302      return sharedData;
303    }
304  }
305
306  @Test
307  public void testSharedData() throws IOException {
308    TableName tableName = TableName.valueOf(name.getMethodName());
309    byte[][] families = { fam1, fam2, fam3 };
310
311    Configuration hc = initConfig();
312    HRegion region = initHRegion(tableName, name.getMethodName(), hc, new Class<?>[] {}, families);
313
314    for (int i = 0; i < 3; i++) {
315      HTestConst.addContent(region, fam3);
316      region.flush(true);
317    }
318
319    region.compact(false);
320
321    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
322
323    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
324    Coprocessor c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
325    Object o = ((CoprocessorImpl) c).getSharedData().get("test1");
326    Object o2 = ((CoprocessorII) c2).getSharedData().get("test2");
327    assertNotNull(o);
328    assertNotNull(o2);
329    // to coprocessors get different sharedDatas
330    assertFalse(((CoprocessorImpl) c).getSharedData() == ((CoprocessorII) c2).getSharedData());
331    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
332    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
333    // make sure that all coprocessor of a class have identical sharedDatas
334    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
335    assertTrue(((CoprocessorII) c2).getSharedData().get("test2") == o2);
336
337    // now have all Environments fail
338    try {
339      byte[] r = region.getRegionInfo().getStartKey();
340      if (r == null || r.length <= 0) {
341        // Its the start row. Can't ask for null. Ask for minimal key instead.
342        r = new byte[] { 0 };
343      }
344      Get g = new Get(r);
345      region.get(g);
346      fail();
347    } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
348    }
349    assertNull(region.getCoprocessorHost().findCoprocessor(CoprocessorII.class));
350    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
351    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
352    c = c2 = null;
353    // perform a GC
354    System.gc();
355    // reopen the region
356    region = reopenRegion(region, CoprocessorImpl.class, CoprocessorII.class);
357    c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
358    // CPimpl is unaffected, still the same reference
359    assertTrue(((CoprocessorImpl) c).getSharedData().get("test1") == o);
360    c2 = region.getCoprocessorHost().findCoprocessor(CoprocessorII.class);
361    // new map and object created, hence the reference is different
362    // hence the old entry was indeed removed by the GC and new one has been created
363    Object o3 = ((CoprocessorII) c2).getSharedData().get("test2");
364    assertFalse(o3 == o2);
365    HBaseTestingUtil.closeRegionAndWAL(region);
366  }
367
368  @Test
369  public void testCoprocessorInterface() throws IOException {
370    TableName tableName = TableName.valueOf(name.getMethodName());
371    byte[][] families = { fam1, fam2, fam3 };
372
373    Configuration hc = initConfig();
374    HRegion region = initHRegion(tableName, name.getMethodName(), hc,
375      new Class<?>[] { CoprocessorImpl.class }, families);
376    for (int i = 0; i < 3; i++) {
377      HTestConst.addContent(region, fam3);
378      region.flush(true);
379    }
380
381    region.compact(false);
382
383    // HBASE-4197
384    Scan s = new Scan();
385    RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
386    assertTrue(scanner instanceof CustomScanner);
387    // this would throw an exception before HBASE-4197
388    scanner.next(new ArrayList<>());
389
390    HBaseTestingUtil.closeRegionAndWAL(region);
391    Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
392
393    assertTrue("Coprocessor not started", ((CoprocessorImpl) c).wasStarted());
394    assertTrue("Coprocessor not stopped", ((CoprocessorImpl) c).wasStopped());
395    assertTrue(((CoprocessorImpl) c).wasOpened());
396    assertTrue(((CoprocessorImpl) c).wasClosed());
397    assertTrue(((CoprocessorImpl) c).wasFlushed());
398    assertTrue(((CoprocessorImpl) c).wasCompacted());
399  }
400
401  HRegion reopenRegion(final HRegion closedRegion, Class<?>... implClasses) throws IOException {
402    // RegionInfo info = new RegionInfo(tableName, null, null, false);
403    HRegion r = HRegion.openHRegion(closedRegion, null);
404
405    // this following piece is a hack. currently a coprocessorHost
406    // is secretly loaded at OpenRegionHandler. we don't really
407    // start a region server here, so just manually create cphost
408    // and set it to region.
409    Configuration conf = TEST_UTIL.getConfiguration();
410    RegionCoprocessorHost host =
411      new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
412    r.setCoprocessorHost(host);
413
414    for (Class<?> implClass : implClasses) {
415      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
416    }
417    // we need to manually call pre- and postOpen here since the
418    // above load() is not the real case for CP loading. A CP is
419    // expected to be loaded by default from 1) configuration; or 2)
420    // HTableDescriptor. If it's loaded after HRegion initialized,
421    // the pre- and postOpen() won't be triggered automatically.
422    // Here we have to call pre and postOpen explicitly.
423    host.preOpen();
424    host.postOpen();
425    return r;
426  }
427
428  HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
429    Class<?>[] implClasses, byte[][] families) throws IOException {
430    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
431    for (byte[] family : families) {
432      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
433    }
434    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
435      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
436    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).setStartKey(null).setEndKey(null)
437      .setSplit(false).build();
438    Path path = new Path(DIR + callingMethod);
439    HRegion r = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build());
440
441    // this following piece is a hack.
442    RegionCoprocessorHost host =
443      new RegionCoprocessorHost(r, Mockito.mock(RegionServerServices.class), conf);
444    r.setCoprocessorHost(host);
445
446    for (Class<?> implClass : implClasses) {
447      host.load(implClass.asSubclass(RegionCoprocessor.class), Coprocessor.PRIORITY_USER, conf);
448      Coprocessor c = host.findCoprocessor(implClass.getName());
449      assertNotNull(c);
450    }
451
452    // Here we have to call pre and postOpen explicitly.
453    host.preOpen();
454    host.postOpen();
455    return r;
456  }
457
458  private Configuration initConfig() {
459    // Always compact if there is more than one store file.
460    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
461    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
462    // Increase the amount of time between client retries
463    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
464    // This size should make it so we always split using the addContent
465    // below. After adding all data, the first region is 1.3M
466    TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
467    TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
468
469    return TEST_UTIL.getConfiguration();
470  }
471}