001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Durability;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.wal.WALEdit;
035import org.junit.jupiter.api.AfterAll;
036import org.junit.jupiter.api.BeforeAll;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Test that verifies we do not have memstore size negative when a postPut/Delete hook is
044 * slow/expensive and a flush is triggered at the same time the coprocessow is doing its work. To
045 * simulate this we call flush from the coprocessor itself
046 */
047@Tag(MediumTests.TAG)
048public class TestNegativeMemStoreSizeWithSlowCoprocessor {
049
050  static final Logger LOG =
051    LoggerFactory.getLogger(TestNegativeMemStoreSizeWithSlowCoprocessor.class);
052
053  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
054  private static final byte[] tableName = Bytes.toBytes("test_table");
055  private static final byte[] family = Bytes.toBytes("f");
056  private static final byte[] qualifier = Bytes.toBytes("q");
057
058  @BeforeAll
059  public static void setupBeforeClass() throws Exception {
060    Configuration conf = TEST_UTIL.getConfiguration();
061    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
062      FlushingRegionObserver.class.getName());
063    conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
064    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
065    TEST_UTIL.startMiniCluster(1);
066    TEST_UTIL.createTable(TableName.valueOf(tableName), family);
067  }
068
069  @AfterAll
070  public static void tearDownAfterClass() throws Exception {
071    TEST_UTIL.shutdownMiniCluster();
072  }
073
074  @Test
075  public void testNegativeMemstoreSize() throws IOException, InterruptedException {
076    boolean IOEthrown = false;
077    Table table = null;
078    try {
079      table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
080
081      // Adding data
082      Put put1 = new Put(Bytes.toBytes("row1"));
083      put1.addColumn(family, qualifier, Bytes.toBytes("Value1"));
084      table.put(put1);
085      Put put2 = new Put(Bytes.toBytes("row2"));
086      put2.addColumn(family, qualifier, Bytes.toBytes("Value2"));
087      table.put(put2);
088      table.put(put2);
089    } catch (IOException e) {
090      IOEthrown = true;
091    } finally {
092      assertFalse(IOEthrown, "Shouldn't have thrown an exception");
093      if (table != null) {
094        table.close();
095      }
096    }
097  }
098
099  public static class FlushingRegionObserver extends SimpleRegionObserver {
100
101    @Override
102    public void postPut(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
103      final Put put, final WALEdit edit, final Durability durability) throws IOException {
104      HRegion region = (HRegion) c.getEnvironment().getRegion();
105      super.postPut(c, put, edit, durability);
106
107      if (Bytes.equals(put.getRow(), Bytes.toBytes("row2"))) {
108        region.flush(false);
109        assertTrue(region.getMemStoreDataSize() >= 0);
110      }
111    }
112  }
113}