001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.MiniHBaseCluster;
039import org.apache.hadoop.hbase.StartMiniClusterOption;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Durability;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
049import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
050import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
056import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
057import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
058import org.apache.hadoop.hbase.master.HMaster;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.testclassification.RegionServerTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.JVMClusterUtil;
063import org.apache.hadoop.hbase.wal.WAL;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.apache.hadoop.hdfs.DFSConfigKeys;
066import org.apache.hadoop.hdfs.MiniDFSCluster;
067import org.junit.After;
068import org.junit.Before;
069import org.junit.ClassRule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * Tests around regionserver shutdown and abort
077 */
078@Category({RegionServerTests.class, MediumTests.class})
079public class TestRegionServerAbort {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083      HBaseClassTestRule.forClass(TestRegionServerAbort.class);
084
085  private static final byte[] FAMILY_BYTES = Bytes.toBytes("f");
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAbort.class);
088
089  private HBaseTestingUtility testUtil;
090  private Configuration conf;
091  private MiniDFSCluster dfsCluster;
092  private MiniHBaseCluster cluster;
093
094  @Before
095  public void setup() throws Exception {
096    testUtil = new HBaseTestingUtility();
097    conf = testUtil.getConfiguration();
098    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
099        StopBlockingRegionObserver.class.getName());
100    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
101        StopBlockingRegionObserver.class.getName());
102    // make sure we have multiple blocks so that the client does not prefetch all block locations
103    conf.set("dfs.blocksize", Long.toString(100 * 1024));
104    // prefetch the first block
105    conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024));
106    conf.set(HConstants.REGION_IMPL, ErrorThrowingHRegion.class.getName());
107
108    testUtil.startMiniZKCluster();
109    dfsCluster = testUtil.startMiniDFSCluster(2);
110    StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
111    cluster = testUtil.startMiniHBaseCluster(option);
112  }
113
114  @After
115  public void tearDown() throws Exception {
116    String className = StopBlockingRegionObserver.class.getName();
117    for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
118      HRegionServer rs = t.getRegionServer();
119      RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
120      StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(className);
121      cp.setStopAllowed(true);
122    }
123    HMaster master = cluster.getMaster();
124    RegionServerCoprocessorHost host = master.getRegionServerCoprocessorHost();
125    if (host != null) {
126      StopBlockingRegionObserver obs = (StopBlockingRegionObserver) host.findCoprocessor(className);
127      if (obs != null) obs.setStopAllowed(true);
128    }
129    testUtil.shutdownMiniCluster();
130  }
131
132  /**
133   * Test that a regionserver is able to abort properly, even when a coprocessor
134   * throws an exception in preStopRegionServer().
135   */
136  @Test
137  public void testAbortFromRPC() throws Exception {
138    TableName tableName = TableName.valueOf("testAbortFromRPC");
139    // create a test table
140    Table table = testUtil.createTable(tableName, FAMILY_BYTES);
141
142    // write some edits
143    testUtil.loadTable(table, FAMILY_BYTES);
144    LOG.info("Wrote data");
145    // force a flush
146    cluster.flushcache(tableName);
147    LOG.info("Flushed table");
148
149    // Send a poisoned put to trigger the abort
150    Put put = new Put(new byte[]{0, 0, 0, 0});
151    put.addColumn(FAMILY_BYTES, Bytes.toBytes("c"), new byte[]{});
152    put.setAttribute(StopBlockingRegionObserver.DO_ABORT, new byte[]{1});
153
154    List<HRegion> regions = cluster.findRegionsForTable(tableName);
155    HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0);
156    table.put(put);
157    // Verify that the regionserver is stopped
158    assertNotNull(firstRegion);
159    assertNotNull(firstRegion.getRegionServerServices());
160    LOG.info("isAborted = " + firstRegion.getRegionServerServices().isAborted());
161    assertTrue(firstRegion.getRegionServerServices().isAborted());
162    LOG.info("isStopped = " + firstRegion.getRegionServerServices().isStopped());
163    assertTrue(firstRegion.getRegionServerServices().isStopped());
164  }
165
166  /**
167   * Test that a coprocessor is able to override a normal regionserver stop request.
168   */
169  @Test
170  public void testStopOverrideFromCoprocessor() throws Exception {
171    Admin admin = testUtil.getHBaseAdmin();
172    HRegionServer regionserver = cluster.getRegionServer(0);
173    admin.stopRegionServer(regionserver.getServerName().getHostAndPort());
174
175    // regionserver should have failed to stop due to coprocessor
176    assertFalse(cluster.getRegionServer(0).isAborted());
177    assertFalse(cluster.getRegionServer(0).isStopped());
178  }
179
180  /**
181   * Tests that only a single abort is processed when multiple aborts are requested.
182   */
183  @Test
184  public void testMultiAbort() {
185    assertTrue(cluster.getRegionServerThreads().size() > 0);
186    JVMClusterUtil.RegionServerThread t = cluster.getRegionServerThreads().get(0);
187    assertTrue(t.isAlive());
188    HRegionServer rs = t.getRegionServer();
189    assertFalse(rs.isAborted());
190    RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
191    StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(
192        StopBlockingRegionObserver.class.getName());
193    // Enable clean abort.
194    cp.setStopAllowed(true);
195    // Issue two aborts in quick succession.
196    // We need a thread pool here, otherwise the abort() runs into SecurityException when running
197    // from the fork join pool when setting the context classloader.
198    ExecutorService executor = Executors.newFixedThreadPool(2);
199    try {
200      CompletableFuture.runAsync(() -> rs.abort("Abort 1"), executor);
201      CompletableFuture.runAsync(() -> rs.abort("Abort 2"), executor);
202      long testTimeoutMs = 10 * 1000;
203      Waiter.waitFor(cluster.getConf(), testTimeoutMs, (Waiter.Predicate<Exception>) rs::isStopped);
204      // Make sure only one abort is received.
205      assertEquals(1, cp.getNumAbortsRequested());
206    } finally {
207      executor.shutdownNow();
208    }
209  }
210
211  @CoreCoprocessor
212  public static class StopBlockingRegionObserver
213      implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver {
214    public static final String DO_ABORT = "DO_ABORT";
215    private boolean stopAllowed;
216    private AtomicInteger abortCount = new AtomicInteger();
217
218    @Override
219    public Optional<RegionObserver> getRegionObserver() {
220      return Optional.of(this);
221    }
222
223    @Override
224    public Optional<RegionServerObserver> getRegionServerObserver() {
225      return Optional.of(this);
226    }
227
228    @Override
229    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
230                       Durability durability) throws IOException {
231      if (put.getAttribute(DO_ABORT) != null) {
232        // TODO: Change this so it throws a CP Abort Exception instead.
233        RegionServerServices rss =
234            ((HasRegionServerServices)c.getEnvironment()).getRegionServerServices();
235        String str = "Aborting for test";
236        LOG.info(str  + " " + rss.getServerName());
237        rss.abort(str, new Throwable(str));
238      }
239    }
240
241    @Override
242    public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env)
243        throws IOException {
244      abortCount.incrementAndGet();
245      if (!stopAllowed) {
246        throw new IOException("Stop not allowed");
247      }
248    }
249
250    public int getNumAbortsRequested() {
251      return abortCount.get();
252    }
253
254    public void setStopAllowed(boolean allowed) {
255      this.stopAllowed = allowed;
256    }
257  }
258
259  /**
260   * Throws an exception during store file refresh in order to trigger a regionserver abort.
261   */
262  public static class ErrorThrowingHRegion extends HRegion {
263    public ErrorThrowingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
264                                RegionInfo regionInfo, TableDescriptor htd,
265                                RegionServerServices rsServices) {
266      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
267    }
268
269    public ErrorThrowingHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
270                                TableDescriptor htd, RegionServerServices rsServices) {
271      super(fs, wal, confParam, htd, rsServices);
272    }
273
274    @Override
275    protected boolean refreshStoreFiles(boolean force) throws IOException {
276      // forced when called through RegionScannerImpl.handleFileNotFound()
277      if (force) {
278        throw new IOException("Failing file refresh for testing");
279      }
280      return super.refreshStoreFiles(force);
281    }
282  }
283}