001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.concurrent.CountDownLatch; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 035import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 036import org.apache.hadoop.hbase.procedure2.Procedure; 037import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 038import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 039import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 040import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 041import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 042import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.JVMClusterUtil; 047import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 048import org.junit.After; 049import org.junit.Assert; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; 057 058@Category({ MasterTests.class, LargeTests.class }) 059 060public class TestSplitWALManager { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestSplitWALManager.class); 065 066 private static HBaseTestingUtility TEST_UTIL; 067 private HMaster master; 068 private SplitWALManager splitWALManager; 069 private TableName TABLE_NAME; 070 private byte[] FAMILY; 071 072 @Before 073 public void setup() throws Exception { 074 TEST_UTIL = new HBaseTestingUtility(); 075 TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 076 TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 077 TEST_UTIL.startMiniCluster(3); 078 master = TEST_UTIL.getHBaseCluster().getMaster(); 079 splitWALManager = master.getSplitWALManager(); 080 TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); 081 FAMILY = Bytes.toBytes("test"); 082 } 083 084 @After 085 public void teardown() throws Exception { 086 TEST_UTIL.shutdownMiniCluster(); 087 } 088 089 @Test 090 public void testAcquireAndRelease() throws Exception { 091 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 092 for (int i = 0; i < 4; i++) { 093 testProcedures.add(new FakeServerProcedure( 094 TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); 095 } 096 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 097 Assert.assertNotNull(server); 098 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 099 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 100 101 Exception e = null; 102 try { 103 splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); 104 } catch (ProcedureSuspendedException suspendException) { 105 e = suspendException; 106 } 107 Assert.assertNotNull(e); 108 Assert.assertTrue(e instanceof ProcedureSuspendedException); 109 110 splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() 111 .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 112 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 113 } 114 115 @Test 116 public void testAddNewServer() throws Exception { 117 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 118 for (int i = 0; i < 4; i++) { 119 testProcedures.add(new FakeServerProcedure( 120 TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); 121 } 122 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 123 Assert.assertNotNull(server); 124 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 125 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 126 127 Exception e = null; 128 try { 129 splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); 130 } catch (ProcedureSuspendedException suspendException) { 131 e = suspendException; 132 } 133 Assert.assertNotNull(e); 134 Assert.assertTrue(e instanceof ProcedureSuspendedException); 135 136 JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); 137 newServer.waitForServerOnline(); 138 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 139 } 140 141 @Test 142 public void testCreateSplitWALProcedures() throws Exception { 143 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 144 // load table 145 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 146 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 147 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 148 Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 149 AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); 150 // Test splitting meta wal 151 FileStatus[] wals = 152 TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); 153 Assert.assertEquals(1, wals.length); 154 List<Procedure> testProcedures = 155 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 156 Assert.assertEquals(1, testProcedures.size()); 157 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 158 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 159 160 // Test splitting wal 161 wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); 162 Assert.assertEquals(1, wals.length); 163 testProcedures = 164 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 165 Assert.assertEquals(1, testProcedures.size()); 166 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 167 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 168 } 169 170 @Test 171 public void testAcquireAndReleaseSplitWALWorker() throws Exception { 172 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 173 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 174 for (int i = 0; i < 3; i++) { 175 FakeServerProcedure procedure = 176 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 177 testProcedures.add(procedure); 178 ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, 179 HConstants.NO_NONCE); 180 } 181 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 182 FakeServerProcedure failedProcedure = 183 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 184 ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, 185 HConstants.NO_NONCE); 186 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 187 Assert.assertFalse(failedProcedure.isWorkerAcquired()); 188 // let one procedure finish and release worker 189 testProcedures.get(0).countDown(); 190 TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); 191 Assert.assertTrue(testProcedures.get(0).isSuccess()); 192 } 193 194 @Test 195 public void testGetWALsToSplit() throws Exception { 196 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 197 // load table 198 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 199 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 200 List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); 201 Assert.assertEquals(1, metaWals.size()); 202 List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); 203 Assert.assertEquals(1, wals.size()); 204 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 205 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 206 .get(); 207 metaWals = splitWALManager.getWALsToSplit(testServer, true); 208 Assert.assertEquals(0, metaWals.size()); 209 } 210 211 @Test 212 public void testSplitLogs() throws Exception { 213 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 214 // load table 215 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 216 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 217 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 218 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 219 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 220 .get(); 221 List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); 222 Assert.assertEquals(1, procedures.size()); 223 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 224 Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); 225 226 procedures = splitWALManager.splitWALs(metaServer, true); 227 Assert.assertEquals(1, procedures.size()); 228 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 229 Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); 230 Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); 231 } 232 233 @Test 234 public void testWorkerReloadWhenMasterRestart() throws Exception { 235 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 236 for (int i = 0; i < 3; i++) { 237 FakeServerProcedure procedure = 238 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 239 testProcedures.add(procedure); 240 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, 241 HConstants.NO_NONCE, HConstants.NO_NONCE); 242 } 243 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 244 // Kill master 245 TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); 246 TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); 247 // restart master 248 TEST_UTIL.getHBaseCluster().startMaster(); 249 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 250 this.master = TEST_UTIL.getHBaseCluster().getMaster(); 251 252 FakeServerProcedure failedProcedure = 253 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 254 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, 255 HConstants.NO_NONCE, HConstants.NO_NONCE); 256 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 257 Assert.assertFalse(failedProcedure.isWorkerAcquired()); 258 for (int i = 0; i < 3; i++) { 259 testProcedures.get(i).countDown(); 260 } 261 failedProcedure.countDown(); 262 } 263 264 public static final class FakeServerProcedure 265 extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> 266 implements ServerProcedureInterface { 267 268 private ServerName serverName; 269 private ServerName worker; 270 private CountDownLatch barrier = new CountDownLatch(1); 271 private boolean triedToAcquire = false; 272 273 public FakeServerProcedure() { 274 } 275 276 public FakeServerProcedure(ServerName serverName) { 277 this.serverName = serverName; 278 } 279 280 public ServerName getServerName() { 281 return serverName; 282 } 283 284 @Override 285 public boolean hasMetaTableRegion() { 286 return false; 287 } 288 289 @Override 290 public ServerOperationType getServerOperationType() { 291 return SPLIT_WAL; 292 } 293 294 @Override 295 protected Flow executeFromState(MasterProcedureEnv env, 296 MasterProcedureProtos.SplitWALState state) 297 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 298 SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); 299 switch (state) { 300 case ACQUIRE_SPLIT_WAL_WORKER: 301 triedToAcquire = true; 302 worker = splitWALManager.acquireSplitWALWorker(this); 303 setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); 304 return Flow.HAS_MORE_STATE; 305 case DISPATCH_WAL_TO_WORKER: 306 barrier.await(); 307 setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); 308 return Flow.HAS_MORE_STATE; 309 case RELEASE_SPLIT_WORKER: 310 splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); 311 return Flow.NO_MORE_STATE; 312 default: 313 throw new UnsupportedOperationException("unhandled state=" + state); 314 } 315 } 316 317 public boolean isWorkerAcquired() { 318 return worker != null; 319 } 320 321 public boolean isTriedToAcquire() { 322 return triedToAcquire; 323 } 324 325 public void countDown() { 326 this.barrier.countDown(); 327 } 328 329 @Override 330 protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) 331 throws IOException, InterruptedException { 332 333 } 334 335 @Override 336 protected MasterProcedureProtos.SplitWALState getState(int stateId) { 337 return MasterProcedureProtos.SplitWALState.forNumber(stateId); 338 } 339 340 @Override 341 protected int getStateId(MasterProcedureProtos.SplitWALState state) { 342 return state.getNumber(); 343 } 344 345 @Override 346 protected MasterProcedureProtos.SplitWALState getInitialState() { 347 return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; 348 } 349 350 @Override 351 protected boolean holdLock(MasterProcedureEnv env) { 352 return true; 353 } 354 355 @Override 356 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 357 358 } 359 360 @Override 361 protected boolean abort(MasterProcedureEnv env) { 362 return false; 363 } 364 365 @Override 366 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 367 MasterProcedureProtos.SplitWALData.Builder builder = 368 MasterProcedureProtos.SplitWALData.newBuilder(); 369 builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); 370 serializer.serialize(builder.build()); 371 } 372 373 @Override 374 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 375 MasterProcedureProtos.SplitWALData data = 376 serializer.deserialize(MasterProcedureProtos.SplitWALData.class); 377 serverName = ProtobufUtil.toServerName(data.getCrashedServer()); 378 } 379 } 380}