001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Durability;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.regionserver.HRegion;
030import org.apache.hadoop.hbase.testclassification.LargeTests;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.wal.WALEdit;
033import org.junit.AfterClass;
034import org.junit.Assert;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Test that verifies we do not have memstore size negative when a postPut/Delete hook is
044 * slow/expensive and a flush is triggered at the same time the coprocessow is doing its work. To
045 * simulate this we call flush from the coprocessor itself
046 */
047@Category(LargeTests.class)
048public class TestNegativeMemStoreSizeWithSlowCoprocessor {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052      HBaseClassTestRule.forClass(TestNegativeMemStoreSizeWithSlowCoprocessor.class);
053
054  static final Logger LOG =
055      LoggerFactory.getLogger(TestNegativeMemStoreSizeWithSlowCoprocessor.class);
056
057  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
058  private static final byte[] tableName = Bytes.toBytes("test_table");
059  private static final byte[] family = Bytes.toBytes("f");
060  private static final byte[] qualifier = Bytes.toBytes("q");
061
062  @BeforeClass
063  public static void setupBeforeClass() throws Exception {
064    Configuration conf = TEST_UTIL.getConfiguration();
065    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
066      FlushingRegionObserver.class.getName());
067    conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
068    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
069    TEST_UTIL.startMiniCluster(1);
070    TEST_UTIL.createTable(TableName.valueOf(tableName), family);
071  }
072
073  @AfterClass
074  public static void tearDownAfterClass() throws Exception {
075    TEST_UTIL.shutdownMiniCluster();
076  }
077
078  @Test
079  public void testNegativeMemstoreSize() throws IOException, InterruptedException {
080    boolean IOEthrown = false;
081    Table table = null;
082    try {
083      table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
084
085      // Adding data
086      Put put1 = new Put(Bytes.toBytes("row1"));
087      put1.addColumn(family, qualifier, Bytes.toBytes("Value1"));
088      table.put(put1);
089      Put put2 = new Put(Bytes.toBytes("row2"));
090      put2.addColumn(family, qualifier, Bytes.toBytes("Value2"));
091      table.put(put2);
092      table.put(put2);
093    } catch (IOException e) {
094      IOEthrown = true;
095    } finally {
096      Assert.assertFalse("Shouldn't have thrown an exception", IOEthrown);
097      if (table != null) {
098        table.close();
099      }
100    }
101  }
102
103  public static class FlushingRegionObserver extends SimpleRegionObserver {
104
105    @Override
106    public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
107        final WALEdit edit, final Durability durability) throws IOException {
108      HRegion region = (HRegion) c.getEnvironment().getRegion();
109      super.postPut(c, put, edit, durability);
110
111      if (Bytes.equals(put.getRow(), Bytes.toBytes("row2"))) {
112        region.flush(false);
113        Assert.assertTrue(region.getMemStoreDataSize() >= 0);
114      }
115    }
116  }
117}