001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertThrows; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.CountDownLatch; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 039import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 040import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 041import org.apache.hadoop.hbase.procedure2.Procedure; 042import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 043import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 044import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 046import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 047import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.MasterTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.junit.jupiter.api.AfterEach; 056import org.junit.jupiter.api.BeforeEach; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 063 064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; 066 067@Tag(MasterTests.TAG) 068@Tag(LargeTests.TAG) 069public class TestSplitWALManager { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class); 072 private static HBaseTestingUtil TEST_UTIL; 073 private HMaster master; 074 private SplitWALManager splitWALManager; 075 private TableName TABLE_NAME; 076 private byte[] FAMILY; 077 078 @BeforeEach 079 public void setUp() throws Exception { 080 TEST_UTIL = new HBaseTestingUtil(); 081 TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 082 TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 5); 083 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 084 TEST_UTIL.startMiniCluster(3); 085 master = TEST_UTIL.getHBaseCluster().getMaster(); 086 splitWALManager = master.getSplitWALManager(); 087 TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); 088 FAMILY = Bytes.toBytes("test"); 089 } 090 091 @AfterEach 092 public void tearDown() throws Exception { 093 TEST_UTIL.shutdownMiniCluster(); 094 } 095 096 @Test 097 public void testAcquireAndRelease() throws Exception { 098 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 099 for (int i = 0; i < 4; i++) { 100 testProcedures.add(new FakeServerProcedure( 101 ServerName.valueOf("server" + i, 12345, EnvironmentEdgeManager.currentTime()))); 102 } 103 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 104 procExec.submitProcedure(testProcedures.get(0)); 105 TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired()); 106 procExec.submitProcedure(testProcedures.get(1)); 107 procExec.submitProcedure(testProcedures.get(2)); 108 TEST_UTIL.waitFor(10000, 109 () -> testProcedures.get(1).isWorkerAcquired() && testProcedures.get(2).isWorkerAcquired()); 110 111 // should get a ProcedureSuspendedException, so it will try to acquire but can not get a worker 112 procExec.submitProcedure(testProcedures.get(3)); 113 TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire()); 114 for (int i = 0; i < 3; i++) { 115 Thread.sleep(1000); 116 assertFalse(testProcedures.get(3).isWorkerAcquired()); 117 } 118 119 // release a worker, the last procedure should be able to get a worker 120 testProcedures.get(0).countDown(); 121 TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired()); 122 123 for (int i = 1; i < 4; i++) { 124 testProcedures.get(i).countDown(); 125 } 126 for (int i = 0; i < 4; i++) { 127 final int index = i; 128 TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished()); 129 } 130 } 131 132 @Test 133 public void testAddNewServer() throws Exception { 134 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 135 for (int i = 0; i < 4; i++) { 136 testProcedures.add( 137 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName())); 138 } 139 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 140 assertNotNull(server); 141 assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 142 assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 143 144 assertThrows(ProcedureSuspendedException.class, 145 () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 146 147 JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); 148 newServer.waitForServerOnline(); 149 assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 150 } 151 152 @Test 153 public void testCreateSplitWALProcedures() throws Exception { 154 TEST_UTIL.createTable(TABLE_NAME, FAMILY, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 155 // load table 156 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 157 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 158 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 159 Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 160 AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); 161 // Test splitting meta wal 162 FileStatus[] wals = 163 TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); 164 assertEquals(1, wals.length); 165 List<Procedure> testProcedures = 166 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 167 assertEquals(1, testProcedures.size()); 168 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 169 assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 170 171 // Test splitting wal 172 wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); 173 assertEquals(1, wals.length); 174 testProcedures = 175 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 176 assertEquals(1, testProcedures.size()); 177 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 178 assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 179 } 180 181 @Test 182 public void testAcquireAndReleaseSplitWALWorker() throws Exception { 183 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 184 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 185 for (int i = 0; i < 3; i++) { 186 FakeServerProcedure procedure = 187 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 188 testProcedures.add(procedure); 189 ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, 190 HConstants.NO_NONCE); 191 } 192 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 193 FakeServerProcedure failedProcedure = 194 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 195 ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, 196 HConstants.NO_NONCE); 197 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 198 assertFalse(failedProcedure.isWorkerAcquired()); 199 // let one procedure finish and release worker 200 testProcedures.get(0).countDown(); 201 TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); 202 assertTrue(testProcedures.get(0).isSuccess()); 203 } 204 205 @Test 206 public void testGetWALsToSplit() throws Exception { 207 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 208 // load table 209 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 210 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 211 List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); 212 assertEquals(1, metaWals.size()); 213 List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); 214 assertEquals(1, wals.size()); 215 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 216 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 217 .get(); 218 metaWals = splitWALManager.getWALsToSplit(testServer, true); 219 assertEquals(0, metaWals.size()); 220 } 221 222 private void splitLogsTestHelper(HBaseTestingUtil testUtil) throws Exception { 223 HMaster hmaster = testUtil.getHBaseCluster().getMaster(); 224 SplitWALManager splitWALManager = hmaster.getSplitWALManager(); 225 LOG.info( 226 "The Master FS is pointing to: " + hmaster.getMasterFileSystem().getFileSystem().getUri()); 227 LOG.info( 228 "The WAL FS is pointing to: " + hmaster.getMasterFileSystem().getWALFileSystem().getUri()); 229 230 testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE); 231 // load table 232 testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY); 233 ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor(); 234 ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta(); 235 ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream() 236 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 237 .get(); 238 List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); 239 assertEquals(1, procedures.size()); 240 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 241 assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); 242 243 // Validate the old WAL file archive dir 244 Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir(); 245 Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 246 FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem(); 247 int archiveFileCount = walFS.listStatus(walArchivePath).length; 248 249 procedures = splitWALManager.splitWALs(metaServer, true); 250 assertEquals(1, procedures.size()); 251 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 252 assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); 253 assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); 254 // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish 255 assertEquals(archiveFileCount + 1, walFS.listStatus(walArchivePath).length, 256 "Splitted WAL files should be archived"); 257 } 258 259 @Test 260 public void testSplitLogs() throws Exception { 261 splitLogsTestHelper(TEST_UTIL); 262 } 263 264 @Test 265 public void testSplitLogsWithDifferentWalAndRootFS() throws Exception { 266 HBaseTestingUtil testUtil2 = new HBaseTestingUtil(); 267 testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 268 testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 269 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir"); 270 testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString()); 271 CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir); 272 testUtil2.startMiniCluster(3); 273 splitLogsTestHelper(testUtil2); 274 testUtil2.shutdownMiniCluster(); 275 } 276 277 @Test 278 public void testWorkerReloadWhenMasterRestart() throws Exception { 279 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 280 for (int i = 0; i < 3; i++) { 281 FakeServerProcedure procedure = 282 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 283 testProcedures.add(procedure); 284 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, 285 HConstants.NO_NONCE, HConstants.NO_NONCE); 286 } 287 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 288 // Kill master 289 TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); 290 TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); 291 // restart master 292 TEST_UTIL.getHBaseCluster().startMaster(); 293 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 294 this.master = TEST_UTIL.getHBaseCluster().getMaster(); 295 296 FakeServerProcedure failedProcedure = 297 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 298 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, 299 HConstants.NO_NONCE, HConstants.NO_NONCE); 300 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 301 assertFalse(failedProcedure.isWorkerAcquired()); 302 for (int i = 0; i < 3; i++) { 303 testProcedures.get(i).countDown(); 304 } 305 failedProcedure.countDown(); 306 } 307 308 public static final class FakeServerProcedure 309 extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> 310 implements ServerProcedureInterface { 311 312 private ServerName serverName; 313 private volatile ServerName worker; 314 private CountDownLatch barrier = new CountDownLatch(1); 315 private volatile boolean triedToAcquire = false; 316 317 public FakeServerProcedure() { 318 } 319 320 public FakeServerProcedure(ServerName serverName) { 321 this.serverName = serverName; 322 } 323 324 public ServerName getServerName() { 325 return serverName; 326 } 327 328 @Override 329 public boolean hasMetaTableRegion() { 330 return false; 331 } 332 333 @Override 334 public ServerOperationType getServerOperationType() { 335 return SPLIT_WAL; 336 } 337 338 @Override 339 protected Flow executeFromState(MasterProcedureEnv env, 340 MasterProcedureProtos.SplitWALState state) 341 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 342 SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); 343 switch (state) { 344 case ACQUIRE_SPLIT_WAL_WORKER: 345 triedToAcquire = true; 346 worker = splitWALManager.acquireSplitWALWorker(this); 347 setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); 348 return Flow.HAS_MORE_STATE; 349 case DISPATCH_WAL_TO_WORKER: 350 barrier.await(); 351 setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); 352 return Flow.HAS_MORE_STATE; 353 case RELEASE_SPLIT_WORKER: 354 splitWALManager.releaseSplitWALWorker(worker); 355 return Flow.NO_MORE_STATE; 356 default: 357 throw new UnsupportedOperationException("unhandled state=" + state); 358 } 359 } 360 361 public boolean isWorkerAcquired() { 362 return worker != null; 363 } 364 365 public boolean isTriedToAcquire() { 366 return triedToAcquire; 367 } 368 369 public void countDown() { 370 this.barrier.countDown(); 371 } 372 373 @Override 374 protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) 375 throws IOException, InterruptedException { 376 377 } 378 379 @Override 380 protected MasterProcedureProtos.SplitWALState getState(int stateId) { 381 return MasterProcedureProtos.SplitWALState.forNumber(stateId); 382 } 383 384 @Override 385 protected int getStateId(MasterProcedureProtos.SplitWALState state) { 386 return state.getNumber(); 387 } 388 389 @Override 390 protected MasterProcedureProtos.SplitWALState getInitialState() { 391 return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; 392 } 393 394 @Override 395 protected boolean holdLock(MasterProcedureEnv env) { 396 return true; 397 } 398 399 @Override 400 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 401 402 } 403 404 @Override 405 protected boolean abort(MasterProcedureEnv env) { 406 return false; 407 } 408 409 @Override 410 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 411 MasterProcedureProtos.SplitWALData.Builder builder = 412 MasterProcedureProtos.SplitWALData.newBuilder(); 413 builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); 414 serializer.serialize(builder.build()); 415 } 416 417 @Override 418 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 419 MasterProcedureProtos.SplitWALData data = 420 serializer.deserialize(MasterProcedureProtos.SplitWALData.class); 421 serverName = ProtobufUtil.toServerName(data.getCrashedServer()); 422 } 423 } 424}