001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.SocketTimeoutException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.Threads;
042import org.junit.AfterClass;
043import org.junit.Assert;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Category({ LargeTests.class, ClientTests.class })
052public class TestClientOperationInterrupt {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestClientOperationInterrupt.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationInterrupt.class);
059
060  private static HBaseTestingUtility util;
061  private static final TableName tableName = TableName.valueOf("test");
062  private static final byte[] dummy = Bytes.toBytes("dummy");
063  private static final byte[] row1 = Bytes.toBytes("r1");
064  private static final byte[] test = Bytes.toBytes("test");
065  private static Configuration conf;
066
067  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
068    @Override
069    public Optional<RegionObserver> getRegionObserver() {
070      return Optional.of(this);
071    }
072
073    @Override
074    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
075      final List<Cell> results) throws IOException {
076      Threads.sleep(2500);
077    }
078  }
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    conf = HBaseConfiguration.create();
083    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
084      TestCoprocessor.class.getName());
085    util = new HBaseTestingUtility(conf);
086    util.startMiniCluster();
087
088    Admin admin = util.getAdmin();
089    if (admin.tableExists(tableName)) {
090      if (admin.isTableEnabled(tableName)) {
091        admin.disableTable(tableName);
092      }
093      admin.deleteTable(tableName);
094    }
095    Table ht = util.createTable(tableName, new byte[][] { dummy, test });
096
097    Put p = new Put(row1);
098    p.addColumn(dummy, dummy, dummy);
099    ht.put(p);
100  }
101
102  @Test
103  public void testInterrupt50Percent() throws IOException, InterruptedException {
104    final AtomicInteger noEx = new AtomicInteger(0);
105    final AtomicInteger badEx = new AtomicInteger(0);
106    final AtomicInteger noInt = new AtomicInteger(0);
107    final AtomicInteger done = new AtomicInteger(0);
108    List<Thread> threads = new ArrayList<>();
109
110    final int nbThread = 100;
111
112    for (int i = 0; i < nbThread; i++) {
113      Thread t = new Thread() {
114        @Override
115        public void run() {
116          try {
117            Table ht = util.getConnection().getTable(tableName);
118            Result r = ht.get(new Get(row1));
119            noEx.incrementAndGet();
120          } catch (IOException e) {
121            LOG.info("exception", e);
122            if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
123              badEx.incrementAndGet();
124            } else {
125              if (Thread.currentThread().isInterrupted()) {
126                noInt.incrementAndGet();
127                LOG.info("The thread should NOT be with the 'interrupt' status.");
128              }
129            }
130          } finally {
131            done.incrementAndGet();
132          }
133        }
134      };
135      t.setName("TestClientOperationInterrupt #" + i);
136      threads.add(t);
137      t.start();
138    }
139    int expectedNoExNum = nbThread / 2;
140
141    for (int i = 0; i < nbThread / 2; i++) {
142      if (threads.get(i).getState().equals(Thread.State.TERMINATED)) {
143        expectedNoExNum--;
144      }
145      threads.get(i).interrupt();
146    }
147
148    boolean stillAlive = true;
149    while (stillAlive) {
150      stillAlive = false;
151      for (Thread t : threads) {
152        if (t.isAlive()) {
153          stillAlive = true;
154        }
155      }
156      Threads.sleep(10);
157    }
158
159    Assert.assertFalse(Thread.currentThread().isInterrupted());
160    Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
161      noEx.get() == expectedNoExNum && badEx.get() == 0);
162
163    // The problem here is that we need the server to free its handlers to handle all operations
164    while (done.get() != nbThread) {
165      Thread.sleep(1);
166    }
167
168    Table ht = util.getConnection().getTable(tableName);
169    Result r = ht.get(new Get(row1));
170    Assert.assertFalse(r.isEmpty());
171  }
172
173  @AfterClass
174  public static void tearDownAfterClass() throws Exception {
175    util.shutdownMiniCluster();
176  }
177}