001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.net.SocketTimeoutException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
036import org.apache.hadoop.hbase.coprocessor.ObserverContext;
037import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
039import org.apache.hadoop.hbase.coprocessor.RegionObserver;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.Threads;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Tag(LargeTests.TAG)
052@Tag(ClientTests.TAG)
053public class TestClientOperationInterrupt {
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationInterrupt.class);
056
057  private static HBaseTestingUtil util;
058  private static final TableName tableName = TableName.valueOf("test");
059  private static final byte[] dummy = Bytes.toBytes("dummy");
060  private static final byte[] row1 = Bytes.toBytes("r1");
061  private static final byte[] test = Bytes.toBytes("test");
062  private static Configuration conf;
063
064  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
065    @Override
066    public Optional<RegionObserver> getRegionObserver() {
067      return Optional.of(this);
068    }
069
070    @Override
071    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
072      final Get get, final List<Cell> results) throws IOException {
073      Threads.sleep(2500);
074    }
075  }
076
077  @BeforeAll
078  public static void setUpBeforeClass() throws Exception {
079    conf = HBaseConfiguration.create();
080    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
081      TestCoprocessor.class.getName());
082    util = new HBaseTestingUtil(conf);
083    util.startMiniCluster();
084
085    Admin admin = util.getAdmin();
086    if (admin.tableExists(tableName)) {
087      if (admin.isTableEnabled(tableName)) {
088        admin.disableTable(tableName);
089      }
090      admin.deleteTable(tableName);
091    }
092    Table ht = util.createTable(tableName, new byte[][] { dummy, test });
093
094    Put p = new Put(row1);
095    p.addColumn(dummy, dummy, dummy);
096    ht.put(p);
097  }
098
099  @Test
100  public void testInterrupt50Percent() throws IOException, InterruptedException {
101    final AtomicInteger noEx = new AtomicInteger(0);
102    final AtomicInteger badEx = new AtomicInteger(0);
103    final AtomicInteger noInt = new AtomicInteger(0);
104    final AtomicInteger done = new AtomicInteger(0);
105    List<Thread> threads = new ArrayList<>();
106
107    final int nbThread = 100;
108
109    for (int i = 0; i < nbThread; i++) {
110      Thread t = new Thread() {
111        @Override
112        public void run() {
113          try {
114            Table ht = util.getConnection().getTable(tableName);
115            Result r = ht.get(new Get(row1));
116            noEx.incrementAndGet();
117          } catch (IOException e) {
118            LOG.info("exception", e);
119            if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
120              badEx.incrementAndGet();
121            } else {
122              if (Thread.currentThread().isInterrupted()) {
123                noInt.incrementAndGet();
124                LOG.info("The thread should NOT be with the 'interrupt' status.");
125              }
126            }
127          } finally {
128            done.incrementAndGet();
129          }
130        }
131      };
132      t.setName("TestClientOperationInterrupt #" + i);
133      threads.add(t);
134      t.start();
135    }
136    int expectedNoExNum = nbThread / 2;
137
138    for (int i = 0; i < nbThread / 2; i++) {
139      if (threads.get(i).getState().equals(Thread.State.TERMINATED)) {
140        expectedNoExNum--;
141      }
142      threads.get(i).interrupt();
143    }
144
145    boolean stillAlive = true;
146    while (stillAlive) {
147      stillAlive = false;
148      for (Thread t : threads) {
149        if (t.isAlive()) {
150          stillAlive = true;
151        }
152      }
153      Threads.sleep(10);
154    }
155
156    assertFalse(Thread.currentThread().isInterrupted());
157    assertTrue(noEx.get() == expectedNoExNum && badEx.get() == 0,
158      " noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get());
159
160    // The problem here is that we need the server to free its handlers to handle all operations
161    while (done.get() != nbThread) {
162      Thread.sleep(1);
163    }
164
165    Table ht = util.getConnection().getTable(tableName);
166    Result r = ht.get(new Get(row1));
167    assertFalse(r.isEmpty());
168  }
169
170  @AfterAll
171  public static void tearDownAfterClass() throws Exception {
172    util.shutdownMiniCluster();
173  }
174}