001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.StartTestingClusterOption;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.RetriesExhaustedException;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.coprocessor.ObserverContext;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
045import org.apache.hadoop.hbase.coprocessor.RegionObserver;
046import org.apache.hadoop.hbase.ipc.CallRunner;
047import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
048import org.apache.hadoop.hbase.ipc.PriorityFunction;
049import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.apache.hadoop.hbase.util.Threads;
055import org.junit.jupiter.api.AfterAll;
056import org.junit.jupiter.api.BeforeAll;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062@Tag(RegionServerTests.TAG)
063@Tag(MediumTests.TAG)
064public class TestRegionServerRejectDuringAbort {
065
066  private static final Logger LOG =
067    LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class);
068
069  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
070
071  private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort");
072
073  private static byte[] CF = Bytes.toBytes("cf");
074
075  private static final int REGIONS_NUM = 5;
076
077  private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null);
078
079  private static volatile boolean shouldThrowTooBig = false;
080
081  @BeforeAll
082  public static void setUp() throws Exception {
083    // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms
084    UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable");
085    UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
086      CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class);
087    StartTestingClusterOption option =
088      StartTestingClusterOption.builder().numRegionServers(2).build();
089    UTIL.startMiniCluster(option);
090    TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
091      .setCoprocessor(SleepWhenCloseCoprocessor.class.getName())
092      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build();
093    UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM);
094  }
095
096  public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl {
097
098    public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority,
099      Configuration conf) {
100      super(maxQueueLength, priority, conf);
101    }
102
103    @Override
104    public boolean offer(CallRunner callRunner) {
105      if (shouldThrowTooBig && callRunner.getRpcCall().getRequestAttribute("test") != null) {
106        return false;
107      }
108      return super.offer(callRunner);
109    }
110  }
111
112  @AfterAll
113  public static void tearDown() throws Exception {
114    UTIL.shutdownMiniCluster();
115  }
116
117  /**
118   * Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short
119   * circuits any other logic. This means we no longer even attempt to enqueue the request onto the
120   * call queue. We verify this by using a special call queue which we can trigger to always return
121   * CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not
122   * see them.
123   */
124  @Test
125  public void testRejectRequestsOnAbort() throws Exception {
126    // We don't want to disrupt the server carrying meta, because we plan to disrupt requests to
127    // the server. Disrupting meta requests messes with the test.
128    HRegionServer serverWithoutMeta = null;
129    for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
130      .getRegionServerThreads()) {
131      HRegionServer regionServer = regionServerThread.getRegionServer();
132      if (
133        regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty()
134          && !regionServer.getRegions(TABLE_NAME).isEmpty()
135      ) {
136        serverWithoutMeta = regionServer;
137        break;
138      }
139    }
140
141    assertNotNull(serverWithoutMeta,
142      "couldn't find a server without meta, but with test table regions");
143
144    Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName()));
145    writer.setDaemon(true);
146    writer.start();
147
148    // Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException
149    // and trigger our custom queue to reject any more requests. This would typically result in
150    // CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing
151    // of a request is working.
152    serverWithoutMeta.abort("Abort RS for test");
153
154    UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
155    assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class);
156  }
157
158  private Runnable getWriterThreadRunnable(ServerName loadServer) {
159    return () -> {
160      try {
161        Configuration conf = UTIL.getConfiguration();
162        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
163        try (Connection conn = ConnectionFactory.createConnection(conf);
164          Table table = conn.getTableBuilder(TABLE_NAME, null)
165            .setRequestAttribute("test", new byte[] { 0 }).build()) {
166          // find the first region to exist on our test server, then submit requests to it
167          for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) {
168            if (regionLocation.getServerName().equals(loadServer)) {
169              submitRequestsToRegion(table, regionLocation.getRegion());
170              return;
171            }
172          }
173          throw new RuntimeException("Failed to find any regions for loadServer " + loadServer);
174        }
175      } catch (Exception e) {
176        LOG.warn("Failed to load data", e);
177        synchronized (THROWN_EXCEPTION) {
178          THROWN_EXCEPTION.set(e);
179          THROWN_EXCEPTION.notifyAll();
180        }
181      }
182    };
183  }
184
185  private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException {
186    // We will block closes of the regions with a CP, so no need to worry about the region getting
187    // reassigned. Just use the same rowkey always.
188    byte[] rowKey = getRowKeyWithin(regionInfo);
189
190    int i = 0;
191    while (true) {
192      try {
193        i++;
194        table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i)));
195      } catch (IOException e) {
196        // only catch RegionServerAbortedException once. After that, the next exception thrown
197        // is our test case
198        if (
199          !shouldThrowTooBig && e instanceof RetriesExhaustedException
200            && e.getCause() instanceof RegionServerAbortedException
201        ) {
202          shouldThrowTooBig = true;
203        } else {
204          throw e;
205        }
206      }
207
208      // small sleep to relieve pressure
209      Threads.sleep(10);
210    }
211  }
212
213  private byte[] getRowKeyWithin(RegionInfo regionInfo) {
214    byte[] rowKey;
215    // region is start of table, find one after start key
216    if (regionInfo.getStartKey().length == 0) {
217      if (regionInfo.getEndKey().length == 0) {
218        // doesn't matter, single region table
219        return Bytes.toBytes(1);
220      } else {
221        // find a row just before endkey
222        rowKey = Bytes.copy(regionInfo.getEndKey());
223        rowKey[rowKey.length - 1]--;
224        return rowKey;
225      }
226    } else {
227      return regionInfo.getStartKey();
228    }
229  }
230
231  public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver {
232
233    public SleepWhenCloseCoprocessor() {
234    }
235
236    @Override
237    public Optional<RegionObserver> getRegionObserver() {
238      return Optional.of(this);
239    }
240
241    @Override
242    public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> c,
243      boolean abortRequested) throws IOException {
244      // Wait so that the region can't close until we get the information we need from our test
245      UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
246    }
247  }
248}