001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.net.SocketTimeoutException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 036import org.apache.hadoop.hbase.coprocessor.ObserverContext; 037import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 039import org.apache.hadoop.hbase.coprocessor.RegionObserver; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.Threads; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Tag(LargeTests.TAG) 052@Tag(ClientTests.TAG) 053public class TestClientOperationInterrupt { 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationInterrupt.class); 056 057 private static HBaseTestingUtil util; 058 private static final TableName tableName = TableName.valueOf("test"); 059 private static final byte[] dummy = Bytes.toBytes("dummy"); 060 private static final byte[] row1 = Bytes.toBytes("r1"); 061 private static final byte[] test = Bytes.toBytes("test"); 062 private static Configuration conf; 063 064 public static class TestCoprocessor implements RegionCoprocessor, RegionObserver { 065 @Override 066 public Optional<RegionObserver> getRegionObserver() { 067 return Optional.of(this); 068 } 069 070 @Override 071 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 072 final Get get, final List<Cell> results) throws IOException { 073 Threads.sleep(2500); 074 } 075 } 076 077 @BeforeAll 078 public static void setUpBeforeClass() throws Exception { 079 conf = HBaseConfiguration.create(); 080 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 081 TestCoprocessor.class.getName()); 082 util = new HBaseTestingUtil(conf); 083 util.startMiniCluster(); 084 085 Admin admin = util.getAdmin(); 086 if (admin.tableExists(tableName)) { 087 if (admin.isTableEnabled(tableName)) { 088 admin.disableTable(tableName); 089 } 090 admin.deleteTable(tableName); 091 } 092 Table ht = util.createTable(tableName, new byte[][] { dummy, test }); 093 094 Put p = new Put(row1); 095 p.addColumn(dummy, dummy, dummy); 096 ht.put(p); 097 } 098 099 @Test 100 public void testInterrupt50Percent() throws IOException, InterruptedException { 101 final AtomicInteger noEx = new AtomicInteger(0); 102 final AtomicInteger badEx = new AtomicInteger(0); 103 final AtomicInteger noInt = new AtomicInteger(0); 104 final AtomicInteger done = new AtomicInteger(0); 105 List<Thread> threads = new ArrayList<>(); 106 107 final int nbThread = 100; 108 109 for (int i = 0; i < nbThread; i++) { 110 Thread t = new Thread() { 111 @Override 112 public void run() { 113 try { 114 Table ht = util.getConnection().getTable(tableName); 115 Result r = ht.get(new Get(row1)); 116 noEx.incrementAndGet(); 117 } catch (IOException e) { 118 LOG.info("exception", e); 119 if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) { 120 badEx.incrementAndGet(); 121 } else { 122 if (Thread.currentThread().isInterrupted()) { 123 noInt.incrementAndGet(); 124 LOG.info("The thread should NOT be with the 'interrupt' status."); 125 } 126 } 127 } finally { 128 done.incrementAndGet(); 129 } 130 } 131 }; 132 t.setName("TestClientOperationInterrupt #" + i); 133 threads.add(t); 134 t.start(); 135 } 136 int expectedNoExNum = nbThread / 2; 137 138 for (int i = 0; i < nbThread / 2; i++) { 139 if (threads.get(i).getState().equals(Thread.State.TERMINATED)) { 140 expectedNoExNum--; 141 } 142 threads.get(i).interrupt(); 143 } 144 145 boolean stillAlive = true; 146 while (stillAlive) { 147 stillAlive = false; 148 for (Thread t : threads) { 149 if (t.isAlive()) { 150 stillAlive = true; 151 } 152 } 153 Threads.sleep(10); 154 } 155 156 assertFalse(Thread.currentThread().isInterrupted()); 157 assertTrue(noEx.get() == expectedNoExNum && badEx.get() == 0, 158 " noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get()); 159 160 // The problem here is that we need the server to free its handlers to handle all operations 161 while (done.get() != nbThread) { 162 Thread.sleep(1); 163 } 164 165 Table ht = util.getConnection().getTable(tableName); 166 Result r = ht.get(new Get(row1)); 167 assertFalse(r.isEmpty()); 168 } 169 170 @AfterAll 171 public static void tearDownAfterClass() throws Exception { 172 util.shutdownMiniCluster(); 173 } 174}