001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.MiniHBaseCluster; 039import org.apache.hadoop.hbase.StartMiniClusterOption; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Durability; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 049import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 050import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 051import org.apache.hadoop.hbase.coprocessor.ObserverContext; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 054import org.apache.hadoop.hbase.coprocessor.RegionObserver; 055import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 056import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 057import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 058import org.apache.hadoop.hbase.master.HMaster; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.testclassification.RegionServerTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.JVMClusterUtil; 063import org.apache.hadoop.hbase.wal.WAL; 064import org.apache.hadoop.hbase.wal.WALEdit; 065import org.apache.hadoop.hdfs.DFSConfigKeys; 066import org.apache.hadoop.hdfs.MiniDFSCluster; 067import org.junit.After; 068import org.junit.Before; 069import org.junit.ClassRule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075/** 076 * Tests around regionserver shutdown and abort 077 */ 078@Category({ RegionServerTests.class, MediumTests.class }) 079public class TestRegionServerAbort { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestRegionServerAbort.class); 084 085 private static final byte[] FAMILY_BYTES = Bytes.toBytes("f"); 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAbort.class); 088 089 private HBaseTestingUtility testUtil; 090 private Configuration conf; 091 private MiniDFSCluster dfsCluster; 092 private MiniHBaseCluster cluster; 093 094 @Before 095 public void setup() throws Exception { 096 testUtil = new HBaseTestingUtility(); 097 conf = testUtil.getConfiguration(); 098 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 099 StopBlockingRegionObserver.class.getName()); 100 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 101 StopBlockingRegionObserver.class.getName()); 102 // make sure we have multiple blocks so that the client does not prefetch all block locations 103 conf.set("dfs.blocksize", Long.toString(100 * 1024)); 104 // prefetch the first block 105 conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024)); 106 conf.set(HConstants.REGION_IMPL, ErrorThrowingHRegion.class.getName()); 107 108 testUtil.startMiniZKCluster(); 109 dfsCluster = testUtil.startMiniDFSCluster(2); 110 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); 111 cluster = testUtil.startMiniHBaseCluster(option); 112 } 113 114 @After 115 public void tearDown() throws Exception { 116 String className = StopBlockingRegionObserver.class.getName(); 117 for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { 118 HRegionServer rs = t.getRegionServer(); 119 RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); 120 StopBlockingRegionObserver cp = 121 (StopBlockingRegionObserver) cpHost.findCoprocessor(className); 122 cp.setStopAllowed(true); 123 } 124 HMaster master = cluster.getMaster(); 125 RegionServerCoprocessorHost host = master.getRegionServerCoprocessorHost(); 126 if (host != null) { 127 StopBlockingRegionObserver obs = (StopBlockingRegionObserver) host.findCoprocessor(className); 128 if (obs != null) obs.setStopAllowed(true); 129 } 130 testUtil.shutdownMiniCluster(); 131 } 132 133 /** 134 * Test that a regionserver is able to abort properly, even when a coprocessor throws an exception 135 * in preStopRegionServer(). 136 */ 137 @Test 138 public void testAbortFromRPC() throws Exception { 139 TableName tableName = TableName.valueOf("testAbortFromRPC"); 140 // create a test table 141 Table table = testUtil.createTable(tableName, FAMILY_BYTES); 142 143 // write some edits 144 testUtil.loadTable(table, FAMILY_BYTES); 145 LOG.info("Wrote data"); 146 // force a flush 147 cluster.flushcache(tableName); 148 LOG.info("Flushed table"); 149 150 // Send a poisoned put to trigger the abort 151 Put put = new Put(new byte[] { 0, 0, 0, 0 }); 152 put.addColumn(FAMILY_BYTES, Bytes.toBytes("c"), new byte[] {}); 153 put.setAttribute(StopBlockingRegionObserver.DO_ABORT, new byte[] { 1 }); 154 155 List<HRegion> regions = cluster.findRegionsForTable(tableName); 156 HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0); 157 table.put(put); 158 // Verify that the regionserver is stopped 159 assertNotNull(firstRegion); 160 assertNotNull(firstRegion.getRegionServerServices()); 161 LOG.info("isAborted = " + firstRegion.getRegionServerServices().isAborted()); 162 assertTrue(firstRegion.getRegionServerServices().isAborted()); 163 LOG.info("isStopped = " + firstRegion.getRegionServerServices().isStopped()); 164 assertTrue(firstRegion.getRegionServerServices().isStopped()); 165 } 166 167 /** 168 * Test that a coprocessor is able to override a normal regionserver stop request. 169 */ 170 @Test 171 public void testStopOverrideFromCoprocessor() throws Exception { 172 Admin admin = testUtil.getHBaseAdmin(); 173 HRegionServer regionserver = cluster.getRegionServer(0); 174 admin.stopRegionServer(regionserver.getServerName().getAddress().toString()); 175 176 // regionserver should have failed to stop due to coprocessor 177 assertFalse(cluster.getRegionServer(0).isAborted()); 178 assertFalse(cluster.getRegionServer(0).isStopped()); 179 } 180 181 /** 182 * Tests that only a single abort is processed when multiple aborts are requested. 183 */ 184 @Test 185 public void testMultiAbort() { 186 assertTrue(cluster.getRegionServerThreads().size() > 0); 187 JVMClusterUtil.RegionServerThread t = cluster.getRegionServerThreads().get(0); 188 assertTrue(t.isAlive()); 189 HRegionServer rs = t.getRegionServer(); 190 assertFalse(rs.isAborted()); 191 RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); 192 StopBlockingRegionObserver cp = (StopBlockingRegionObserver) cpHost 193 .findCoprocessor(StopBlockingRegionObserver.class.getName()); 194 // Enable clean abort. 195 cp.setStopAllowed(true); 196 // Issue two aborts in quick succession. 197 // We need a thread pool here, otherwise the abort() runs into SecurityException when running 198 // from the fork join pool when setting the context classloader. 199 ExecutorService executor = Executors.newFixedThreadPool(2); 200 try { 201 CompletableFuture.runAsync(() -> rs.abort("Abort 1"), executor); 202 CompletableFuture.runAsync(() -> rs.abort("Abort 2"), executor); 203 long testTimeoutMs = 10 * 1000; 204 Waiter.waitFor(cluster.getConf(), testTimeoutMs, (Waiter.Predicate<Exception>) rs::isStopped); 205 // Make sure only one abort is received. 206 assertEquals(1, cp.getNumAbortsRequested()); 207 } finally { 208 executor.shutdownNow(); 209 } 210 } 211 212 @CoreCoprocessor 213 public static class StopBlockingRegionObserver 214 implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver { 215 public static final String DO_ABORT = "DO_ABORT"; 216 private boolean stopAllowed; 217 private AtomicInteger abortCount = new AtomicInteger(); 218 219 @Override 220 public Optional<RegionObserver> getRegionObserver() { 221 return Optional.of(this); 222 } 223 224 @Override 225 public Optional<RegionServerObserver> getRegionServerObserver() { 226 return Optional.of(this); 227 } 228 229 @Override 230 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 231 Durability durability) throws IOException { 232 if (put.getAttribute(DO_ABORT) != null) { 233 // TODO: Change this so it throws a CP Abort Exception instead. 234 RegionServerServices rss = 235 ((HasRegionServerServices) c.getEnvironment()).getRegionServerServices(); 236 String str = "Aborting for test"; 237 LOG.info(str + " " + rss.getServerName()); 238 rss.abort(str, new Throwable(str)); 239 } 240 } 241 242 @Override 243 public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env) 244 throws IOException { 245 abortCount.incrementAndGet(); 246 if (!stopAllowed) { 247 throw new IOException("Stop not allowed"); 248 } 249 } 250 251 public int getNumAbortsRequested() { 252 return abortCount.get(); 253 } 254 255 public void setStopAllowed(boolean allowed) { 256 this.stopAllowed = allowed; 257 } 258 } 259 260 /** 261 * Throws an exception during store file refresh in order to trigger a regionserver abort. 262 */ 263 public static class ErrorThrowingHRegion extends HRegion { 264 public ErrorThrowingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 265 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 266 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 267 } 268 269 public ErrorThrowingHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 270 TableDescriptor htd, RegionServerServices rsServices) { 271 super(fs, wal, confParam, htd, rsServices); 272 } 273 274 @Override 275 protected boolean refreshStoreFiles(boolean force) throws IOException { 276 // forced when called through RegionScannerImpl.handleFileNotFound() 277 if (force) { 278 throw new IOException("Failing file refresh for testing"); 279 } 280 return super.refreshStoreFiles(force); 281 } 282 } 283}