001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.StartTestingClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RetriesExhaustedException;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.ipc.CallRunner;
048import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
049import org.apache.hadoop.hbase.ipc.PriorityFunction;
050import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.util.Threads;
056import org.junit.AfterClass;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064@Category({ RegionServerTests.class, MediumTests.class })
065public class TestRegionServerRejectDuringAbort {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestRegionServerRejectDuringAbort.class);
070
071  private static final Logger LOG =
072    LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class);
073
074  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
075
076  private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort");
077
078  private static byte[] CF = Bytes.toBytes("cf");
079
080  private static final int REGIONS_NUM = 5;
081
082  private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null);
083
084  private static volatile boolean shouldThrowTooBig = false;
085
086  @BeforeClass
087  public static void setUp() throws Exception {
088    // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms
089    UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
090    UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
091      CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class);
092    StartTestingClusterOption option =
093      StartTestingClusterOption.builder().numRegionServers(2).build();
094    UTIL.startMiniCluster(option);
095    TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
096      .setCoprocessor(SleepWhenCloseCoprocessor.class.getName())
097      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build();
098    UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM);
099  }
100
101  public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl {
102
103    public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority,
104      Configuration conf) {
105      super(maxQueueLength, priority, conf);
106    }
107
108    @Override
109    public boolean offer(CallRunner callRunner) {
110      if (shouldThrowTooBig && callRunner.getRpcCall().getRequestAttribute("test") != null) {
111        return false;
112      }
113      return super.offer(callRunner);
114    }
115  }
116
117  @AfterClass
118  public static void tearDown() throws Exception {
119    UTIL.shutdownMiniCluster();
120  }
121
122  /**
123   * Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short
124   * circuits any other logic. This means we no longer even attempt to enqueue the request onto the
125   * call queue. We verify this by using a special call queue which we can trigger to always return
126   * CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not
127   * see them.
128   */
129  @Test
130  public void testRejectRequestsOnAbort() throws Exception {
131    // We don't want to disrupt the server carrying meta, because we plan to disrupt requests to
132    // the server. Disrupting meta requests messes with the test.
133    HRegionServer serverWithoutMeta = null;
134    for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
135      .getRegionServerThreads()) {
136      HRegionServer regionServer = regionServerThread.getRegionServer();
137      if (
138        regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty()
139          && !regionServer.getRegions(TABLE_NAME).isEmpty()
140      ) {
141        serverWithoutMeta = regionServer;
142        break;
143      }
144    }
145
146    assertNotNull("couldn't find a server without meta, but with test table regions",
147      serverWithoutMeta);
148
149    Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName()));
150    writer.setDaemon(true);
151    writer.start();
152
153    // Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException
154    // and trigger our custom queue to reject any more requests. This would typically result in
155    // CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing
156    // of a request is working.
157    serverWithoutMeta.abort("Abort RS for test");
158
159    UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
160    assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class);
161  }
162
163  private Runnable getWriterThreadRunnable(ServerName loadServer) {
164    return () -> {
165      try {
166        Configuration conf = UTIL.getConfiguration();
167        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
168        try (Connection conn = ConnectionFactory.createConnection(conf);
169          Table table = conn.getTableBuilder(TABLE_NAME, null)
170            .setRequestAttribute("test", new byte[] { 0 }).build()) {
171          // find the first region to exist on our test server, then submit requests to it
172          for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) {
173            if (regionLocation.getServerName().equals(loadServer)) {
174              submitRequestsToRegion(table, regionLocation.getRegion());
175              return;
176            }
177          }
178          throw new RuntimeException("Failed to find any regions for loadServer " + loadServer);
179        }
180      } catch (Exception e) {
181        LOG.warn("Failed to load data", e);
182        synchronized (THROWN_EXCEPTION) {
183          THROWN_EXCEPTION.set(e);
184          THROWN_EXCEPTION.notifyAll();
185        }
186      }
187    };
188  }
189
190  private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException {
191    // We will block closes of the regions with a CP, so no need to worry about the region getting
192    // reassigned. Just use the same rowkey always.
193    byte[] rowKey = getRowKeyWithin(regionInfo);
194
195    int i = 0;
196    while (true) {
197      try {
198        i++;
199        table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i)));
200      } catch (IOException e) {
201        // only catch RegionServerAbortedException once. After that, the next exception thrown
202        // is our test case
203        if (
204          !shouldThrowTooBig && e instanceof RetriesExhaustedException
205            && e.getCause() instanceof RegionServerAbortedException
206        ) {
207          shouldThrowTooBig = true;
208        } else {
209          throw e;
210        }
211      }
212
213      // small sleep to relieve pressure
214      Threads.sleep(10);
215    }
216  }
217
218  private byte[] getRowKeyWithin(RegionInfo regionInfo) {
219    byte[] rowKey;
220    // region is start of table, find one after start key
221    if (regionInfo.getStartKey().length == 0) {
222      if (regionInfo.getEndKey().length == 0) {
223        // doesn't matter, single region table
224        return Bytes.toBytes(1);
225      } else {
226        // find a row just before endkey
227        rowKey = Bytes.copy(regionInfo.getEndKey());
228        rowKey[rowKey.length - 1]--;
229        return rowKey;
230      }
231    } else {
232      return regionInfo.getStartKey();
233    }
234  }
235
236  public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver {
237
238    public SleepWhenCloseCoprocessor() {
239    }
240
241    @Override
242    public Optional<RegionObserver> getRegionObserver() {
243      return Optional.of(this);
244    }
245
246    @Override
247    public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
248      throws IOException {
249      // Wait so that the region can't close until we get the information we need from our test
250      UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
251    }
252  }
253}