001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.SocketTimeoutException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.Threads;
042import org.junit.AfterClass;
043import org.junit.Assert;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Category({MediumTests.class, ClientTests.class})
052public class TestClientOperationInterrupt {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestClientOperationInterrupt.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationInterrupt.class);
059
060  private static HBaseTestingUtility util;
061  private static final TableName tableName = TableName.valueOf("test");
062  private static final byte[] dummy = Bytes.toBytes("dummy");
063  private static final byte[] row1 = Bytes.toBytes("r1");
064  private static final byte[] test = Bytes.toBytes("test");
065  private static Configuration conf;
066
067  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
068    @Override
069    public Optional<RegionObserver> getRegionObserver() {
070      return Optional.of(this);
071    }
072
073    @Override
074    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
075                         final Get get, final List<Cell> results) throws IOException {
076      Threads.sleep(2500);
077    }
078  }
079
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    conf = HBaseConfiguration.create();
084    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
085        TestCoprocessor.class.getName());
086    util = new HBaseTestingUtility(conf);
087    util.startMiniCluster();
088
089    Admin admin = util.getAdmin();
090    if (admin.tableExists(tableName)) {
091      if (admin.isTableEnabled(tableName)) {
092        admin.disableTable(tableName);
093      }
094      admin.deleteTable(tableName);
095    }
096    Table ht = util.createTable(tableName, new byte[][]{dummy, test});
097
098    Put p = new Put(row1);
099    p.addColumn(dummy, dummy, dummy);
100    ht.put(p);
101  }
102
103
104  @Test
105  public void testInterrupt50Percent() throws IOException, InterruptedException {
106    final AtomicInteger noEx = new AtomicInteger(0);
107    final AtomicInteger badEx = new AtomicInteger(0);
108    final AtomicInteger noInt = new AtomicInteger(0);
109    final AtomicInteger done = new AtomicInteger(0);
110    List<Thread> threads = new ArrayList<>();
111
112    final int nbThread = 100;
113
114    for (int i = 0; i < nbThread; i++) {
115      Thread t = new Thread() {
116        @Override
117        public void run() {
118          try {
119            Table ht = util.getConnection().getTable(tableName);
120            Result r = ht.get(new Get(row1));
121            noEx.incrementAndGet();
122          } catch (IOException e) {
123            LOG.info("exception", e);
124            if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
125              badEx.incrementAndGet();
126            } else {
127              if (Thread.currentThread().isInterrupted()) {
128                noInt.incrementAndGet();
129                LOG.info("The thread should NOT be with the 'interrupt' status.");
130              }
131            }
132          } finally {
133            done.incrementAndGet();
134          }
135        }
136      };
137      t.setName("TestClientOperationInterrupt #" + i);
138      threads.add(t);
139      t.start();
140    }
141
142    for (int i = 0; i < nbThread / 2; i++) {
143      threads.get(i).interrupt();
144    }
145
146
147    boolean stillAlive = true;
148    while (stillAlive) {
149      stillAlive = false;
150      for (Thread t : threads) {
151        if (t.isAlive()) {
152          stillAlive = true;
153        }
154      }
155      Threads.sleep(10);
156    }
157
158    Assert.assertFalse(Thread.currentThread().isInterrupted());
159
160    Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
161        noEx.get() == nbThread / 2 && badEx.get() == 0);
162
163    // The problem here is that we need the server to free its handlers to handle all operations
164    while (done.get() != nbThread){
165      Thread.sleep(1);
166    }
167
168    Table ht = util.getConnection().getTable(tableName);
169    Result r = ht.get(new Get(row1));
170    Assert.assertFalse(r.isEmpty());
171  }
172
173  @AfterClass
174  public static void tearDownAfterClass() throws Exception {
175    util.shutdownMiniCluster();
176  }
177}