001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
038import org.apache.hadoop.hbase.StartTestingClusterOption;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Durability;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
048import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
049import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
050import org.apache.hadoop.hbase.coprocessor.ObserverContext;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.coprocessor.RegionObserver;
054import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
055import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
056import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.testclassification.RegionServerTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.JVMClusterUtil;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hdfs.DFSConfigKeys;
064import org.apache.hadoop.hdfs.MiniDFSCluster;
065import org.junit.jupiter.api.AfterEach;
066import org.junit.jupiter.api.BeforeEach;
067import org.junit.jupiter.api.Tag;
068import org.junit.jupiter.api.Test;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * Tests around regionserver shutdown and abort
074 */
075@Tag(RegionServerTests.TAG)
076@Tag(MediumTests.TAG)
077public class TestRegionServerAbort {
078
079  private static final byte[] FAMILY_BYTES = Bytes.toBytes("f");
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAbort.class);
082
083  private HBaseTestingUtil testUtil;
084  private Configuration conf;
085  private MiniDFSCluster dfsCluster;
086  private SingleProcessHBaseCluster cluster;
087
088  @BeforeEach
089  public void setup() throws Exception {
090    testUtil = new HBaseTestingUtil();
091    conf = testUtil.getConfiguration();
092    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
093      StopBlockingRegionObserver.class.getName());
094    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
095      StopBlockingRegionObserver.class.getName());
096    // make sure we have multiple blocks so that the client does not prefetch all block locations
097    conf.set("dfs.blocksize", Long.toString(100 * 1024));
098    // prefetch the first block
099    conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024));
100    conf.set(HConstants.REGION_IMPL, ErrorThrowingHRegion.class.getName());
101
102    testUtil.startMiniZKCluster();
103    dfsCluster = testUtil.startMiniDFSCluster(2);
104    StartTestingClusterOption option =
105      StartTestingClusterOption.builder().numRegionServers(2).build();
106    cluster = testUtil.startMiniHBaseCluster(option);
107  }
108
109  @AfterEach
110  public void tearDown() throws Exception {
111    String className = StopBlockingRegionObserver.class.getName();
112    for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
113      HRegionServer rs = t.getRegionServer();
114      RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
115      StopBlockingRegionObserver cp =
116        (StopBlockingRegionObserver) cpHost.findCoprocessor(className);
117      cp.setStopAllowed(true);
118    }
119    testUtil.shutdownMiniCluster();
120  }
121
122  /**
123   * Test that a regionserver is able to abort properly, even when a coprocessor throws an exception
124   * in preStopRegionServer().
125   */
126  @Test
127  public void testAbortFromRPC() throws Exception {
128    TableName tableName = TableName.valueOf("testAbortFromRPC");
129    // create a test table
130    Table table = testUtil.createTable(tableName, FAMILY_BYTES);
131
132    // write some edits
133    testUtil.loadTable(table, FAMILY_BYTES);
134    LOG.info("Wrote data");
135    // force a flush
136    cluster.flushcache(tableName);
137    LOG.info("Flushed table");
138
139    // Send a poisoned put to trigger the abort
140    Put put = new Put(new byte[] { 0, 0, 0, 0 });
141    put.addColumn(FAMILY_BYTES, Bytes.toBytes("c"), new byte[] {});
142    put.setAttribute(StopBlockingRegionObserver.DO_ABORT, new byte[] { 1 });
143
144    List<HRegion> regions = cluster.findRegionsForTable(tableName);
145    HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0);
146    table.put(put);
147    // Verify that the regionserver is stopped
148    assertNotNull(firstRegion);
149    assertNotNull(firstRegion.getRegionServerServices());
150    LOG.info("isAborted = " + firstRegion.getRegionServerServices().isAborted());
151    assertTrue(firstRegion.getRegionServerServices().isAborted());
152    LOG.info("isStopped = " + firstRegion.getRegionServerServices().isStopped());
153    assertTrue(firstRegion.getRegionServerServices().isStopped());
154  }
155
156  /**
157   * Test that a coprocessor is able to override a normal regionserver stop request.
158   */
159  @Test
160  public void testStopOverrideFromCoprocessor() throws Exception {
161    Admin admin = testUtil.getAdmin();
162    HRegionServer regionserver = cluster.getRegionServer(0);
163    admin.stopRegionServer(regionserver.getServerName().getAddress().toString());
164
165    // regionserver should have failed to stop due to coprocessor
166    assertFalse(cluster.getRegionServer(0).isAborted());
167    assertFalse(cluster.getRegionServer(0).isStopped());
168  }
169
170  /**
171   * Tests that only a single abort is processed when multiple aborts are requested.
172   */
173  @Test
174  public void testMultiAbort() {
175    assertTrue(cluster.getRegionServerThreads().size() > 0);
176    JVMClusterUtil.RegionServerThread t = cluster.getRegionServerThreads().get(0);
177    assertTrue(t.isAlive());
178    HRegionServer rs = t.getRegionServer();
179    assertFalse(rs.isAborted());
180    RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
181    StopBlockingRegionObserver cp = (StopBlockingRegionObserver) cpHost
182      .findCoprocessor(StopBlockingRegionObserver.class.getName());
183    // Enable clean abort.
184    cp.setStopAllowed(true);
185    // Issue two aborts in quick succession.
186    // We need a thread pool here, otherwise the abort() runs into SecurityException when running
187    // from the fork join pool when setting the context classloader.
188    ExecutorService executor = Executors.newFixedThreadPool(2);
189    try {
190      CompletableFuture.runAsync(() -> rs.abort("Abort 1"), executor);
191      CompletableFuture.runAsync(() -> rs.abort("Abort 2"), executor);
192      long testTimeoutMs = 10 * 1000;
193      Waiter.waitFor(cluster.getConf(), testTimeoutMs, (Waiter.Predicate<Exception>) rs::isStopped);
194      // Make sure only one abort is received.
195      assertEquals(1, cp.getNumAbortsRequested());
196    } finally {
197      executor.shutdownNow();
198    }
199  }
200
201  @CoreCoprocessor
202  public static class StopBlockingRegionObserver
203    implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver {
204    public static final String DO_ABORT = "DO_ABORT";
205    private boolean stopAllowed;
206    private AtomicInteger abortCount = new AtomicInteger();
207
208    @Override
209    public Optional<RegionObserver> getRegionObserver() {
210      return Optional.of(this);
211    }
212
213    @Override
214    public Optional<RegionServerObserver> getRegionServerObserver() {
215      return Optional.of(this);
216    }
217
218    @Override
219    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put,
220      WALEdit edit, Durability durability) throws IOException {
221      if (put.getAttribute(DO_ABORT) != null) {
222        // TODO: Change this so it throws a CP Abort Exception instead.
223        RegionServerServices rss =
224          ((HasRegionServerServices) c.getEnvironment()).getRegionServerServices();
225        String str = "Aborting for test";
226        LOG.info(str + " " + rss.getServerName());
227        rss.abort(str, new Throwable(str));
228      }
229    }
230
231    @Override
232    public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env)
233      throws IOException {
234      abortCount.incrementAndGet();
235      if (!stopAllowed) {
236        throw new IOException("Stop not allowed");
237      }
238    }
239
240    public int getNumAbortsRequested() {
241      return abortCount.get();
242    }
243
244    public void setStopAllowed(boolean allowed) {
245      this.stopAllowed = allowed;
246    }
247  }
248
249  /**
250   * Throws an exception during store file refresh in order to trigger a regionserver abort.
251   */
252  public static class ErrorThrowingHRegion extends HRegion {
253    public ErrorThrowingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
254      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
255      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
256    }
257
258    public ErrorThrowingHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
259      TableDescriptor htd, RegionServerServices rsServices) {
260      super(fs, wal, confParam, htd, rsServices);
261    }
262
263    @Override
264    protected boolean refreshStoreFiles(boolean force) throws IOException {
265      // forced when called through RegionScannerImpl.handleFileNotFound()
266      if (force) {
267        throw new IOException("Failing file refresh for testing");
268      }
269      return super.refreshStoreFiles(force);
270    }
271  }
272}