001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.MiniHBaseCluster; 034import org.apache.hadoop.hbase.StartMiniClusterOption; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Durability; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 045import org.apache.hadoop.hbase.coprocessor.ObserverContext; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.RegionObserver; 049import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 050import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 051import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 052import org.apache.hadoop.hbase.master.HMaster; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.RegionServerTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.JVMClusterUtil; 057import org.apache.hadoop.hbase.wal.WAL; 058import org.apache.hadoop.hbase.wal.WALEdit; 059import org.apache.hadoop.hdfs.DFSConfigKeys; 060import org.apache.hadoop.hdfs.MiniDFSCluster; 061import org.junit.After; 062import org.junit.Before; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Tests around regionserver shutdown and abort 071 */ 072@Category({RegionServerTests.class, MediumTests.class}) 073public class TestRegionServerAbort { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestRegionServerAbort.class); 078 079 private static final byte[] FAMILY_BYTES = Bytes.toBytes("f"); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAbort.class); 082 083 private HBaseTestingUtility testUtil; 084 private Configuration conf; 085 private MiniDFSCluster dfsCluster; 086 private MiniHBaseCluster cluster; 087 088 @Before 089 public void setup() throws Exception { 090 testUtil = new HBaseTestingUtility(); 091 conf = testUtil.getConfiguration(); 092 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 093 StopBlockingRegionObserver.class.getName()); 094 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 095 StopBlockingRegionObserver.class.getName()); 096 // make sure we have multiple blocks so that the client does not prefetch all block locations 097 conf.set("dfs.blocksize", Long.toString(100 * 1024)); 098 // prefetch the first block 099 conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024)); 100 conf.set(HConstants.REGION_IMPL, ErrorThrowingHRegion.class.getName()); 101 102 testUtil.startMiniZKCluster(); 103 dfsCluster = testUtil.startMiniDFSCluster(2); 104 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); 105 cluster = testUtil.startMiniHBaseCluster(option); 106 } 107 108 @After 109 public void tearDown() throws Exception { 110 String className = StopBlockingRegionObserver.class.getName(); 111 for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { 112 HRegionServer rs = t.getRegionServer(); 113 RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost(); 114 StopBlockingRegionObserver cp = (StopBlockingRegionObserver)cpHost.findCoprocessor(className); 115 cp.setStopAllowed(true); 116 } 117 HMaster master = cluster.getMaster(); 118 RegionServerCoprocessorHost host = master.getRegionServerCoprocessorHost(); 119 if (host != null) { 120 StopBlockingRegionObserver obs = (StopBlockingRegionObserver) host.findCoprocessor(className); 121 if (obs != null) obs.setStopAllowed(true); 122 } 123 testUtil.shutdownMiniCluster(); 124 } 125 126 /** 127 * Test that a regionserver is able to abort properly, even when a coprocessor 128 * throws an exception in preStopRegionServer(). 129 */ 130 @Test 131 public void testAbortFromRPC() throws Exception { 132 TableName tableName = TableName.valueOf("testAbortFromRPC"); 133 // create a test table 134 Table table = testUtil.createTable(tableName, FAMILY_BYTES); 135 136 // write some edits 137 testUtil.loadTable(table, FAMILY_BYTES); 138 LOG.info("Wrote data"); 139 // force a flush 140 cluster.flushcache(tableName); 141 LOG.info("Flushed table"); 142 143 // Send a poisoned put to trigger the abort 144 Put put = new Put(new byte[]{0, 0, 0, 0}); 145 put.addColumn(FAMILY_BYTES, Bytes.toBytes("c"), new byte[]{}); 146 put.setAttribute(StopBlockingRegionObserver.DO_ABORT, new byte[]{1}); 147 148 List<HRegion> regions = cluster.findRegionsForTable(tableName); 149 HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0); 150 table.put(put); 151 // Verify that the regionserver is stopped 152 assertNotNull(firstRegion); 153 assertNotNull(firstRegion.getRegionServerServices()); 154 LOG.info("isAborted = " + firstRegion.getRegionServerServices().isAborted()); 155 assertTrue(firstRegion.getRegionServerServices().isAborted()); 156 LOG.info("isStopped = " + firstRegion.getRegionServerServices().isStopped()); 157 assertTrue(firstRegion.getRegionServerServices().isStopped()); 158 } 159 160 /** 161 * Test that a coprocessor is able to override a normal regionserver stop request. 162 */ 163 @Test 164 public void testStopOverrideFromCoprocessor() throws Exception { 165 Admin admin = testUtil.getHBaseAdmin(); 166 HRegionServer regionserver = cluster.getRegionServer(0); 167 admin.stopRegionServer(regionserver.getServerName().getHostAndPort()); 168 169 // regionserver should have failed to stop due to coprocessor 170 assertFalse(cluster.getRegionServer(0).isAborted()); 171 assertFalse(cluster.getRegionServer(0).isStopped()); 172 } 173 174 @CoreCoprocessor 175 public static class StopBlockingRegionObserver 176 implements RegionServerCoprocessor, RegionCoprocessor, RegionServerObserver, RegionObserver { 177 public static final String DO_ABORT = "DO_ABORT"; 178 private boolean stopAllowed; 179 180 @Override 181 public Optional<RegionObserver> getRegionObserver() { 182 return Optional.of(this); 183 } 184 185 @Override 186 public Optional<RegionServerObserver> getRegionServerObserver() { 187 return Optional.of(this); 188 } 189 190 @Override 191 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 192 Durability durability) throws IOException { 193 if (put.getAttribute(DO_ABORT) != null) { 194 // TODO: Change this so it throws a CP Abort Exception instead. 195 RegionServerServices rss = 196 ((HasRegionServerServices)c.getEnvironment()).getRegionServerServices(); 197 String str = "Aborting for test"; 198 LOG.info(str + " " + rss.getServerName()); 199 rss.abort(str, new Throwable(str)); 200 } 201 } 202 203 @Override 204 public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env) 205 throws IOException { 206 if (!stopAllowed) { 207 throw new IOException("Stop not allowed"); 208 } 209 } 210 211 public void setStopAllowed(boolean allowed) { 212 this.stopAllowed = allowed; 213 } 214 } 215 216 /** 217 * Throws an exception during store file refresh in order to trigger a regionserver abort. 218 */ 219 public static class ErrorThrowingHRegion extends HRegion { 220 public ErrorThrowingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 221 RegionInfo regionInfo, TableDescriptor htd, 222 RegionServerServices rsServices) { 223 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 224 } 225 226 public ErrorThrowingHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 227 TableDescriptor htd, RegionServerServices rsServices) { 228 super(fs, wal, confParam, htd, rsServices); 229 } 230 231 @Override 232 protected boolean refreshStoreFiles(boolean force) throws IOException { 233 // forced when called through RegionScannerImpl.handleFileNotFound() 234 if (force) { 235 throw new IOException("Failing file refresh for testing"); 236 } 237 return super.refreshStoreFiles(force); 238 } 239 } 240}