001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.CountDownLatch; 028 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 037import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 038import org.apache.hadoop.hbase.procedure2.Procedure; 039import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 041import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 043import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 044import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 045import org.apache.hadoop.hbase.testclassification.MasterTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.junit.After; 051import org.junit.Assert; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; 060 061@Category({ MasterTests.class, MediumTests.class }) 062 063public class TestSplitWALManager { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestSplitWALManager.class); 068 069 private static HBaseTestingUtility TEST_UTIL; 070 private HMaster master; 071 private SplitWALManager splitWALManager; 072 private TableName TABLE_NAME; 073 private byte[] FAMILY; 074 075 @Before 076 public void setup() throws Exception { 077 TEST_UTIL = new HBaseTestingUtility(); 078 TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 079 TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 080 TEST_UTIL.startMiniCluster(3); 081 master = TEST_UTIL.getHBaseCluster().getMaster(); 082 splitWALManager = master.getSplitWALManager(); 083 TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); 084 FAMILY = Bytes.toBytes("test"); 085 } 086 087 @After 088 public void teardown() throws Exception { 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 092 @Test 093 public void testAcquireAndRelease() throws Exception { 094 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 095 for (int i = 0; i < 4; i++) { 096 testProcedures.add(new FakeServerProcedure( 097 TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); 098 } 099 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 100 Assert.assertNotNull(server); 101 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 102 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 103 104 Exception e = null; 105 try { 106 splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); 107 } catch (ProcedureSuspendedException suspendException) { 108 e = suspendException; 109 } 110 Assert.assertNotNull(e); 111 Assert.assertTrue(e instanceof ProcedureSuspendedException); 112 113 splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() 114 .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 115 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 116 } 117 118 @Test 119 public void testAddNewServer() throws Exception { 120 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 121 for (int i = 0; i < 4; i++) { 122 testProcedures.add(new FakeServerProcedure( 123 TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); 124 } 125 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 126 Assert.assertNotNull(server); 127 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 128 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 129 130 Exception e = null; 131 try { 132 splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); 133 } catch (ProcedureSuspendedException suspendException) { 134 e = suspendException; 135 } 136 Assert.assertNotNull(e); 137 Assert.assertTrue(e instanceof ProcedureSuspendedException); 138 139 JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); 140 newServer.waitForServerOnline(); 141 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 142 } 143 144 @Test 145 public void testCreateSplitWALProcedures() throws Exception { 146 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 147 // load table 148 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 149 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 150 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 151 Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 152 AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); 153 // Test splitting meta wal 154 FileStatus[] wals = 155 TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); 156 Assert.assertEquals(1, wals.length); 157 List<Procedure> testProcedures = 158 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 159 Assert.assertEquals(1, testProcedures.size()); 160 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 161 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 162 163 // Test splitting wal 164 wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); 165 Assert.assertEquals(1, wals.length); 166 testProcedures = 167 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 168 Assert.assertEquals(1, testProcedures.size()); 169 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 170 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 171 } 172 173 @Test 174 public void testAcquireAndReleaseSplitWALWorker() throws Exception { 175 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 176 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 177 for (int i = 0; i < 3; i++) { 178 FakeServerProcedure procedure = 179 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 180 testProcedures.add(procedure); 181 ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, 182 HConstants.NO_NONCE); 183 } 184 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 185 FakeServerProcedure failedProcedure = 186 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 187 ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, 188 HConstants.NO_NONCE); 189 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 190 Assert.assertFalse(failedProcedure.isWorkerAcquired()); 191 // let one procedure finish and release worker 192 testProcedures.get(0).countDown(); 193 TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); 194 Assert.assertTrue(testProcedures.get(0).isSuccess()); 195 } 196 197 @Test 198 public void testGetWALsToSplit() throws Exception { 199 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 200 // load table 201 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 202 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 203 List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); 204 Assert.assertEquals(1, metaWals.size()); 205 List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); 206 Assert.assertEquals(1, wals.size()); 207 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 208 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 209 .get(); 210 metaWals = splitWALManager.getWALsToSplit(testServer, true); 211 Assert.assertEquals(0, metaWals.size()); 212 } 213 214 @Test 215 public void testSplitLogs() throws Exception { 216 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 217 // load table 218 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 219 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 220 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 221 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 222 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 223 .get(); 224 List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); 225 Assert.assertEquals(1, procedures.size()); 226 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 227 Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); 228 229 procedures = splitWALManager.splitWALs(metaServer, true); 230 Assert.assertEquals(1, procedures.size()); 231 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 232 Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); 233 Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); 234 } 235 236 @Test 237 public void testWorkerReloadWhenMasterRestart() throws Exception { 238 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 239 for (int i = 0; i < 3; i++) { 240 FakeServerProcedure procedure = 241 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 242 testProcedures.add(procedure); 243 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, 244 HConstants.NO_NONCE, HConstants.NO_NONCE); 245 } 246 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 247 // Kill master 248 TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); 249 TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); 250 // restart master 251 TEST_UTIL.getHBaseCluster().startMaster(); 252 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 253 this.master = TEST_UTIL.getHBaseCluster().getMaster(); 254 255 FakeServerProcedure failedProcedure = 256 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 257 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, 258 HConstants.NO_NONCE, HConstants.NO_NONCE); 259 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 260 Assert.assertFalse(failedProcedure.isWorkerAcquired()); 261 for (int i = 0; i < 3; i++) { 262 testProcedures.get(i).countDown(); 263 } 264 failedProcedure.countDown(); 265 } 266 267 public static final class FakeServerProcedure 268 extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> 269 implements ServerProcedureInterface { 270 271 private ServerName serverName; 272 private ServerName worker; 273 private CountDownLatch barrier = new CountDownLatch(1); 274 private boolean triedToAcquire = false; 275 276 public FakeServerProcedure() { 277 } 278 279 public FakeServerProcedure(ServerName serverName) { 280 this.serverName = serverName; 281 } 282 283 public ServerName getServerName() { 284 return serverName; 285 } 286 287 @Override 288 public boolean hasMetaTableRegion() { 289 return false; 290 } 291 292 @Override 293 public ServerOperationType getServerOperationType() { 294 return SPLIT_WAL; 295 } 296 297 @Override 298 protected Flow executeFromState(MasterProcedureEnv env, 299 MasterProcedureProtos.SplitWALState state) 300 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 301 SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); 302 switch (state) { 303 case ACQUIRE_SPLIT_WAL_WORKER: 304 triedToAcquire = true; 305 worker = splitWALManager.acquireSplitWALWorker(this); 306 setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); 307 return Flow.HAS_MORE_STATE; 308 case DISPATCH_WAL_TO_WORKER: 309 barrier.await(); 310 setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); 311 return Flow.HAS_MORE_STATE; 312 case RELEASE_SPLIT_WORKER: 313 splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); 314 return Flow.NO_MORE_STATE; 315 default: 316 throw new UnsupportedOperationException("unhandled state=" + state); 317 } 318 } 319 320 public boolean isWorkerAcquired() { 321 return worker != null; 322 } 323 324 public boolean isTriedToAcquire() { 325 return triedToAcquire; 326 } 327 328 public void countDown() { 329 this.barrier.countDown(); 330 } 331 332 @Override 333 protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) 334 throws IOException, InterruptedException { 335 336 } 337 338 @Override 339 protected MasterProcedureProtos.SplitWALState getState(int stateId) { 340 return MasterProcedureProtos.SplitWALState.forNumber(stateId); 341 } 342 343 @Override 344 protected int getStateId(MasterProcedureProtos.SplitWALState state) { 345 return state.getNumber(); 346 } 347 348 @Override 349 protected MasterProcedureProtos.SplitWALState getInitialState() { 350 return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; 351 } 352 353 @Override 354 protected boolean holdLock(MasterProcedureEnv env) { 355 return true; 356 } 357 358 @Override 359 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 360 361 } 362 363 @Override 364 protected boolean abort(MasterProcedureEnv env) { 365 return false; 366 } 367 368 @Override 369 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 370 MasterProcedureProtos.SplitWALData.Builder builder = 371 MasterProcedureProtos.SplitWALData.newBuilder(); 372 builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); 373 serializer.serialize(builder.build()); 374 } 375 376 @Override 377 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 378 MasterProcedureProtos.SplitWALData data = 379 serializer.deserialize(MasterProcedureProtos.SplitWALData.class); 380 serverName = ProtobufUtil.toServerName(data.getCrashedServer()); 381 } 382 } 383}