001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 038import org.apache.hadoop.hbase.StartTestingClusterOption; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Durability; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 048import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 049import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 050import org.apache.hadoop.hbase.coprocessor.ObserverContext; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 053import org.apache.hadoop.hbase.coprocessor.RegionObserver; 054import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 055import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 056import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.testclassification.RegionServerTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.JVMClusterUtil; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.apache.hadoop.hdfs.DFSConfigKeys; 064import org.apache.hadoop.hdfs.MiniDFSCluster; 065import org.junit.jupiter.api.AfterEach; 066import org.junit.jupiter.api.BeforeEach; 067import org.junit.jupiter.api.Tag; 068import org.junit.jupiter.api.Test; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * Tests around regionserver shutdown and abort 074 */ 075@Tag(RegionServerTests.TAG) 076@Tag(MediumTests.TAG) 077public class TestRegionServerAbort { 078 079 private static final byte[] FAMILY_BYTES = Bytes.toBytes("f"); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAbort.class); 082 083 private HBaseTestingUtil testUtil; 084 private Configuration conf; 085 private MiniDFSCluster dfsCluster; 086 private SingleProcessHBaseCluster cluster; 087 088 @BeforeEach 089 public void setup() throws Exception { 090 testUtil = new HBaseTestingUtil(); 091 conf = testUtil.getConfiguration(); 092 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 093 StopBlockingRegionObserver.class.getName()); 094 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 095 StopBlockingRegionObserver.class.getName()); 096 // make sure we have multiple blocks so that the client does not prefetch all block locations 097 conf.set("dfs.blocksize", Long.toString(100 * 1024)); 098 // prefetch the first block 099 conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024)); 100 conf.set(HConstants.REGION_IMPL, ErrorThrowingHRegion.class.getName()); 101 102 testUtil.startMiniZKCluster(); 103 dfsCluster = testUtil.startMiniDFSCluster(2); 104 StartTestingClusterOption option = 105 StartTestingClusterOption.builder().numRegionServers(2).build(); 106 cluster = testUtil.startMiniHBaseCluster(option); 107 } 108 109 @AfterEach 110 public void tearDown() throws Exception { 111 String className = StopBlockingRegionObserver.class.getName(); 112 for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { 113 HRegionServer rs = t.getRegionServer(); 114 RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); 115 StopBlockingRegionObserver cp = 116 (StopBlockingRegionObserver) cpHost.findCoprocessor(className); 117 cp.setStopAllowed(true); 118 } 119 testUtil.shutdownMiniCluster(); 120 } 121 122 /** 123 * Test that a regionserver is able to abort properly, even when a coprocessor throws an exception 124 * in preStopRegionServer(). 125 */ 126 @Test 127 public void testAbortFromRPC() throws Exception { 128 TableName tableName = TableName.valueOf("testAbortFromRPC"); 129 // create a test table 130 Table table = testUtil.createTable(tableName, FAMILY_BYTES); 131 132 // write some edits 133 testUtil.loadTable(table, FAMILY_BYTES); 134 LOG.info("Wrote data"); 135 // force a flush 136 cluster.flushcache(tableName); 137 LOG.info("Flushed table"); 138 139 // Send a poisoned put to trigger the abort 140 Put put = new Put(new byte[] { 0, 0, 0, 0 }); 141 put.addColumn(FAMILY_BYTES, Bytes.toBytes("c"), new byte[] {}); 142 put.setAttribute(StopBlockingRegionObserver.DO_ABORT, new byte[] { 1 }); 143 144 List<HRegion> regions = cluster.findRegionsForTable(tableName); 145 HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0); 146 table.put(put); 147 // Verify that the regionserver is stopped 148 assertNotNull(firstRegion); 149 assertNotNull(firstRegion.getRegionServerServices()); 150 LOG.info("isAborted = " + firstRegion.getRegionServerServices().isAborted()); 151 assertTrue(firstRegion.getRegionServerServices().isAborted()); 152 LOG.info("isStopped = " + firstRegion.getRegionServerServices().isStopped()); 153 assertTrue(firstRegion.getRegionServerServices().isStopped()); 154 } 155 156 /** 157 * Test that a coprocessor is able to override a normal regionserver stop request. 158 */ 159 @Test 160 public void testStopOverrideFromCoprocessor() throws Exception { 161 Admin admin = testUtil.getAdmin(); 162 HRegionServer regionserver = cluster.getRegionServer(0); 163 admin.stopRegionServer(regionserver.getServerName().getAddress().toString()); 164 165 // regionserver should have failed to stop due to coprocessor 166 assertFalse(cluster.getRegionServer(0).isAborted()); 167 assertFalse(cluster.getRegionServer(0).isStopped()); 168 } 169 170 /** 171 * Tests that only a single abort is processed when multiple aborts are requested. 172 */ 173 @Test 174 public void testMultiAbort() { 175 assertTrue(cluster.getRegionServerThreads().size() > 0); 176 JVMClusterUtil.RegionServerThread t = cluster.getRegionServerThreads().get(0); 177 assertTrue(t.isAlive()); 178 HRegionServer rs = t.getRegionServer(); 179 assertFalse(rs.isAborted()); 180 RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); 181 StopBlockingRegionObserver cp = (StopBlockingRegionObserver) cpHost 182 .findCoprocessor(StopBlockingRegionObserver.class.getName()); 183 // Enable clean abort. 184 cp.setStopAllowed(true); 185 // Issue two aborts in quick succession. 186 // We need a thread pool here, otherwise the abort() runs into SecurityException when running 187 // from the fork join pool when setting the context classloader. 188 ExecutorService executor = Executors.newFixedThreadPool(2); 189 try { 190 CompletableFuture.runAsync(() -> rs.abort("Abort 1"), executor); 191 CompletableFuture.runAsync(() -> rs.abort("Abort 2"), executor); 192 long testTimeoutMs = 10 * 1000; 193 Waiter.waitFor(cluster.getConf(), testTimeoutMs, (Waiter.Predicate<Exception>) rs::isStopped); 194 // Make sure only one abort is received. 195 assertEquals(1, cp.getNumAbortsRequested()); 196 } finally { 197 executor.shutdownNow(); 198 } 199 } 200 201 @CoreCoprocessor 202 public static class StopBlockingRegionObserver 203 implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver { 204 public static final String DO_ABORT = "DO_ABORT"; 205 private boolean stopAllowed; 206 private AtomicInteger abortCount = new AtomicInteger(); 207 208 @Override 209 public Optional<RegionObserver> getRegionObserver() { 210 return Optional.of(this); 211 } 212 213 @Override 214 public Optional<RegionServerObserver> getRegionServerObserver() { 215 return Optional.of(this); 216 } 217 218 @Override 219 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 220 WALEdit edit, Durability durability) throws IOException { 221 if (put.getAttribute(DO_ABORT) != null) { 222 // TODO: Change this so it throws a CP Abort Exception instead. 223 RegionServerServices rss = 224 ((HasRegionServerServices) c.getEnvironment()).getRegionServerServices(); 225 String str = "Aborting for test"; 226 LOG.info(str + " " + rss.getServerName()); 227 rss.abort(str, new Throwable(str)); 228 } 229 } 230 231 @Override 232 public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env) 233 throws IOException { 234 abortCount.incrementAndGet(); 235 if (!stopAllowed) { 236 throw new IOException("Stop not allowed"); 237 } 238 } 239 240 public int getNumAbortsRequested() { 241 return abortCount.get(); 242 } 243 244 public void setStopAllowed(boolean allowed) { 245 this.stopAllowed = allowed; 246 } 247 } 248 249 /** 250 * Throws an exception during store file refresh in order to trigger a regionserver abort. 251 */ 252 public static class ErrorThrowingHRegion extends HRegion { 253 public ErrorThrowingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 254 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 255 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 256 } 257 258 public ErrorThrowingHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 259 TableDescriptor htd, RegionServerServices rsServices) { 260 super(fs, wal, confParam, htd, rsServices); 261 } 262 263 @Override 264 protected boolean refreshStoreFiles(boolean force) throws IOException { 265 // forced when called through RegionScannerImpl.handleFileNotFound() 266 if (force) { 267 throw new IOException("Failing file refresh for testing"); 268 } 269 return super.refreshStoreFiles(force); 270 } 271 } 272}