001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertThrows; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.CountDownLatch; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 040import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 041import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 042import org.apache.hadoop.hbase.procedure2.Procedure; 043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 044import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 045import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 046import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 047import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 048import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.MasterTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.JVMClusterUtil; 055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 056import org.junit.After; 057import org.junit.Before; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; 068 069@Category({ MasterTests.class, LargeTests.class }) 070public class TestSplitWALManager { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestSplitWALManager.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class); 077 private static HBaseTestingUtil TEST_UTIL; 078 private HMaster master; 079 private SplitWALManager splitWALManager; 080 private TableName TABLE_NAME; 081 private byte[] FAMILY; 082 083 @Before 084 public void setUp() throws Exception { 085 TEST_UTIL = new HBaseTestingUtil(); 086 TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 087 TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 5); 088 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 089 TEST_UTIL.startMiniCluster(3); 090 master = TEST_UTIL.getHBaseCluster().getMaster(); 091 splitWALManager = master.getSplitWALManager(); 092 TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); 093 FAMILY = Bytes.toBytes("test"); 094 } 095 096 @After 097 public void tearDown() throws Exception { 098 TEST_UTIL.shutdownMiniCluster(); 099 } 100 101 @Test 102 public void testAcquireAndRelease() throws Exception { 103 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 104 for (int i = 0; i < 4; i++) { 105 testProcedures.add(new FakeServerProcedure( 106 ServerName.valueOf("server" + i, 12345, EnvironmentEdgeManager.currentTime()))); 107 } 108 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 109 procExec.submitProcedure(testProcedures.get(0)); 110 TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired()); 111 procExec.submitProcedure(testProcedures.get(1)); 112 procExec.submitProcedure(testProcedures.get(2)); 113 TEST_UTIL.waitFor(10000, 114 () -> testProcedures.get(1).isWorkerAcquired() && testProcedures.get(2).isWorkerAcquired()); 115 116 // should get a ProcedureSuspendedException, so it will try to acquire but can not get a worker 117 procExec.submitProcedure(testProcedures.get(3)); 118 TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire()); 119 for (int i = 0; i < 3; i++) { 120 Thread.sleep(1000); 121 assertFalse(testProcedures.get(3).isWorkerAcquired()); 122 } 123 124 // release a worker, the last procedure should be able to get a worker 125 testProcedures.get(0).countDown(); 126 TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired()); 127 128 for (int i = 1; i < 4; i++) { 129 testProcedures.get(i).countDown(); 130 } 131 for (int i = 0; i < 4; i++) { 132 final int index = i; 133 TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished()); 134 } 135 } 136 137 @Test 138 public void testAddNewServer() throws Exception { 139 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 140 for (int i = 0; i < 4; i++) { 141 testProcedures.add( 142 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName())); 143 } 144 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 145 assertNotNull(server); 146 assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 147 assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 148 149 assertThrows(ProcedureSuspendedException.class, 150 () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 151 152 JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); 153 newServer.waitForServerOnline(); 154 assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 155 } 156 157 @Test 158 public void testCreateSplitWALProcedures() throws Exception { 159 TEST_UTIL.createTable(TABLE_NAME, FAMILY, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 160 // load table 161 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 162 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 163 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 164 Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 165 AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); 166 // Test splitting meta wal 167 FileStatus[] wals = 168 TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); 169 assertEquals(1, wals.length); 170 List<Procedure> testProcedures = 171 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 172 assertEquals(1, testProcedures.size()); 173 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 174 assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 175 176 // Test splitting wal 177 wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); 178 assertEquals(1, wals.length); 179 testProcedures = 180 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 181 assertEquals(1, testProcedures.size()); 182 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 183 assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 184 } 185 186 @Test 187 public void testAcquireAndReleaseSplitWALWorker() throws Exception { 188 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 189 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 190 for (int i = 0; i < 3; i++) { 191 FakeServerProcedure procedure = 192 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 193 testProcedures.add(procedure); 194 ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, 195 HConstants.NO_NONCE); 196 } 197 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 198 FakeServerProcedure failedProcedure = 199 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 200 ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, 201 HConstants.NO_NONCE); 202 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 203 assertFalse(failedProcedure.isWorkerAcquired()); 204 // let one procedure finish and release worker 205 testProcedures.get(0).countDown(); 206 TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); 207 assertTrue(testProcedures.get(0).isSuccess()); 208 } 209 210 @Test 211 public void testGetWALsToSplit() throws Exception { 212 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 213 // load table 214 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 215 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 216 List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); 217 assertEquals(1, metaWals.size()); 218 List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); 219 assertEquals(1, wals.size()); 220 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 221 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 222 .get(); 223 metaWals = splitWALManager.getWALsToSplit(testServer, true); 224 assertEquals(0, metaWals.size()); 225 } 226 227 private void splitLogsTestHelper(HBaseTestingUtil testUtil) throws Exception { 228 HMaster hmaster = testUtil.getHBaseCluster().getMaster(); 229 SplitWALManager splitWALManager = hmaster.getSplitWALManager(); 230 LOG.info( 231 "The Master FS is pointing to: " + hmaster.getMasterFileSystem().getFileSystem().getUri()); 232 LOG.info( 233 "The WAL FS is pointing to: " + hmaster.getMasterFileSystem().getWALFileSystem().getUri()); 234 235 testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE); 236 // load table 237 testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY); 238 ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor(); 239 ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta(); 240 ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream() 241 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 242 .get(); 243 List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); 244 assertEquals(1, procedures.size()); 245 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 246 assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); 247 248 // Validate the old WAL file archive dir 249 Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir(); 250 Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 251 FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem(); 252 int archiveFileCount = walFS.listStatus(walArchivePath).length; 253 254 procedures = splitWALManager.splitWALs(metaServer, true); 255 assertEquals(1, procedures.size()); 256 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 257 assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); 258 assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); 259 // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish 260 assertEquals("Splitted WAL files should be archived", archiveFileCount + 1, 261 walFS.listStatus(walArchivePath).length); 262 } 263 264 @Test 265 public void testSplitLogs() throws Exception { 266 splitLogsTestHelper(TEST_UTIL); 267 } 268 269 @Test 270 public void testSplitLogsWithDifferentWalAndRootFS() throws Exception { 271 HBaseTestingUtil testUtil2 = new HBaseTestingUtil(); 272 testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 273 testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 274 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir"); 275 testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString()); 276 CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir); 277 testUtil2.startMiniCluster(3); 278 splitLogsTestHelper(testUtil2); 279 testUtil2.shutdownMiniCluster(); 280 } 281 282 @Test 283 public void testWorkerReloadWhenMasterRestart() throws Exception { 284 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 285 for (int i = 0; i < 3; i++) { 286 FakeServerProcedure procedure = 287 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 288 testProcedures.add(procedure); 289 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, 290 HConstants.NO_NONCE, HConstants.NO_NONCE); 291 } 292 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 293 // Kill master 294 TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); 295 TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); 296 // restart master 297 TEST_UTIL.getHBaseCluster().startMaster(); 298 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 299 this.master = TEST_UTIL.getHBaseCluster().getMaster(); 300 301 FakeServerProcedure failedProcedure = 302 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 303 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, 304 HConstants.NO_NONCE, HConstants.NO_NONCE); 305 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 306 assertFalse(failedProcedure.isWorkerAcquired()); 307 for (int i = 0; i < 3; i++) { 308 testProcedures.get(i).countDown(); 309 } 310 failedProcedure.countDown(); 311 } 312 313 public static final class FakeServerProcedure 314 extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> 315 implements ServerProcedureInterface { 316 317 private ServerName serverName; 318 private volatile ServerName worker; 319 private CountDownLatch barrier = new CountDownLatch(1); 320 private volatile boolean triedToAcquire = false; 321 322 public FakeServerProcedure() { 323 } 324 325 public FakeServerProcedure(ServerName serverName) { 326 this.serverName = serverName; 327 } 328 329 public ServerName getServerName() { 330 return serverName; 331 } 332 333 @Override 334 public boolean hasMetaTableRegion() { 335 return false; 336 } 337 338 @Override 339 public ServerOperationType getServerOperationType() { 340 return SPLIT_WAL; 341 } 342 343 @Override 344 protected Flow executeFromState(MasterProcedureEnv env, 345 MasterProcedureProtos.SplitWALState state) 346 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 347 SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); 348 switch (state) { 349 case ACQUIRE_SPLIT_WAL_WORKER: 350 triedToAcquire = true; 351 worker = splitWALManager.acquireSplitWALWorker(this); 352 setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); 353 return Flow.HAS_MORE_STATE; 354 case DISPATCH_WAL_TO_WORKER: 355 barrier.await(); 356 setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); 357 return Flow.HAS_MORE_STATE; 358 case RELEASE_SPLIT_WORKER: 359 splitWALManager.releaseSplitWALWorker(worker); 360 return Flow.NO_MORE_STATE; 361 default: 362 throw new UnsupportedOperationException("unhandled state=" + state); 363 } 364 } 365 366 public boolean isWorkerAcquired() { 367 return worker != null; 368 } 369 370 public boolean isTriedToAcquire() { 371 return triedToAcquire; 372 } 373 374 public void countDown() { 375 this.barrier.countDown(); 376 } 377 378 @Override 379 protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) 380 throws IOException, InterruptedException { 381 382 } 383 384 @Override 385 protected MasterProcedureProtos.SplitWALState getState(int stateId) { 386 return MasterProcedureProtos.SplitWALState.forNumber(stateId); 387 } 388 389 @Override 390 protected int getStateId(MasterProcedureProtos.SplitWALState state) { 391 return state.getNumber(); 392 } 393 394 @Override 395 protected MasterProcedureProtos.SplitWALState getInitialState() { 396 return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; 397 } 398 399 @Override 400 protected boolean holdLock(MasterProcedureEnv env) { 401 return true; 402 } 403 404 @Override 405 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 406 407 } 408 409 @Override 410 protected boolean abort(MasterProcedureEnv env) { 411 return false; 412 } 413 414 @Override 415 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 416 MasterProcedureProtos.SplitWALData.Builder builder = 417 MasterProcedureProtos.SplitWALData.newBuilder(); 418 builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); 419 serializer.serialize(builder.build()); 420 } 421 422 @Override 423 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 424 MasterProcedureProtos.SplitWALData data = 425 serializer.deserialize(MasterProcedureProtos.SplitWALData.class); 426 serverName = ProtobufUtil.toServerName(data.getCrashedServer()); 427 } 428 } 429}