001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.fail;
021
022import java.io.IOException;
023import java.util.Optional;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Delete;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Increment;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
036import org.apache.hadoop.hbase.coprocessor.RegionObserver;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Threads;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.BeforeAll;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.junit.jupiter.api.TestInfo;
045
046@Tag(MediumTests.TAG)
047public class TestSettingTimeoutOnBlockingPoint {
048
049  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
050  private static final byte[] FAM = Bytes.toBytes("f");
051  private static final byte[] ROW1 = Bytes.toBytes("row1");
052  private static final byte[] ROW2 = Bytes.toBytes("row2");
053
054  @BeforeAll
055  public static void setUpBeforeClass() throws Exception {
056    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
057    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
058    // simulate queue blocking
059    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
060    TEST_UTIL.startMiniCluster(2);
061  }
062
063  @AfterAll
064  public static void tearDownAfterClass() throws Exception {
065    TEST_UTIL.shutdownMiniCluster();
066  }
067
068  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
069    public static final int SLEEP_TIME = 10000;
070
071    @Override
072    public Optional<RegionObserver> getRegionObserver() {
073      return Optional.of(this);
074    }
075
076    @Override
077    public Result preIncrementAfterRowLock(
078      final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Increment increment)
079      throws IOException {
080      Threads.sleep(SLEEP_TIME);
081      return null;
082    }
083  }
084
085  @Test
086  public void testRowLock(TestInfo testInfo) throws IOException {
087    TableDescriptor hdt =
088      TEST_UTIL.createModifyableTableDescriptor(testInfo.getTestMethod().get().getName())
089        .setCoprocessor(SleepCoprocessor.class.getName()).build();
090    TEST_UTIL.createTable(hdt, new byte[][] { FAM }, TEST_UTIL.getConfiguration());
091    TableName tableName = hdt.getTableName();
092    Thread incrementThread = new Thread(() -> {
093      try {
094        try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
095          table.incrementColumnValue(ROW1, FAM, FAM, 1);
096        }
097      } catch (IOException e) {
098        fail(e.getMessage());
099      }
100    });
101    Thread getThread = new Thread(() -> {
102      try (Table table =
103        TEST_UTIL.getConnection().getTableBuilder(tableName, null).setRpcTimeout(1000).build()) {
104        Delete delete = new Delete(ROW1);
105        table.delete(delete);
106      } catch (IOException e) {
107        fail(e.getMessage());
108      }
109    });
110
111    incrementThread.start();
112    Threads.sleep(1000);
113    getThread.start();
114    Threads.sleep(2000);
115    try (Table table =
116      TEST_UTIL.getConnection().getTableBuilder(tableName, null).setRpcTimeout(1000).build()) {
117      // We have only two handlers. The first thread will get a write lock for row1 and occupy
118      // the first handler. The second thread need a read lock for row1, it should quit after 1000
119      // ms and give back the handler because it can not get the lock in time.
120      // So we can get the value using the second handler.
121      table.get(new Get(ROW2)); // Will throw exception if the timeout checking is failed
122    } finally {
123      incrementThread.interrupt();
124      getThread.interrupt();
125    }
126  }
127}