001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.CountDownLatch; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 037import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 038import org.apache.hadoop.hbase.procedure2.Procedure; 039import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 041import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 043import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 044import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.testclassification.MasterTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.JVMClusterUtil; 050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 051import org.junit.After; 052import org.junit.Assert; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 061 062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; 064 065@Category({ MasterTests.class, LargeTests.class }) 066 067public class TestSplitWALManager { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestSplitWALManager.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class); 074 private static HBaseTestingUtility TEST_UTIL; 075 private HMaster master; 076 private SplitWALManager splitWALManager; 077 private TableName TABLE_NAME; 078 private byte[] FAMILY; 079 080 @Before 081 public void setup() throws Exception { 082 TEST_UTIL = new HBaseTestingUtility(); 083 TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 084 TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 085 TEST_UTIL.startMiniCluster(3); 086 master = TEST_UTIL.getHBaseCluster().getMaster(); 087 splitWALManager = master.getSplitWALManager(); 088 TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); 089 FAMILY = Bytes.toBytes("test"); 090 } 091 092 @After 093 public void teardown() throws Exception { 094 TEST_UTIL.shutdownMiniCluster(); 095 } 096 097 @Test 098 public void testAcquireAndRelease() throws Exception { 099 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 100 for (int i = 0; i < 4; i++) { 101 testProcedures 102 .add(new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); 103 } 104 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 105 Assert.assertNotNull(server); 106 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 107 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 108 109 Exception e = null; 110 try { 111 splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); 112 } catch (ProcedureSuspendedException suspendException) { 113 e = suspendException; 114 } 115 Assert.assertNotNull(e); 116 Assert.assertTrue(e instanceof ProcedureSuspendedException); 117 118 splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() 119 .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 120 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 121 } 122 123 @Test 124 public void testAddNewServer() throws Exception { 125 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 126 for (int i = 0; i < 4; i++) { 127 testProcedures 128 .add(new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); 129 } 130 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); 131 Assert.assertNotNull(server); 132 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); 133 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); 134 135 Exception e = null; 136 try { 137 splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); 138 } catch (ProcedureSuspendedException suspendException) { 139 e = suspendException; 140 } 141 Assert.assertNotNull(e); 142 Assert.assertTrue(e instanceof ProcedureSuspendedException); 143 144 JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); 145 newServer.waitForServerOnline(); 146 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); 147 } 148 149 @Test 150 public void testCreateSplitWALProcedures() throws Exception { 151 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 152 // load table 153 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 154 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 155 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 156 Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 157 AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); 158 // Test splitting meta wal 159 FileStatus[] wals = 160 TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); 161 Assert.assertEquals(1, wals.length); 162 List<Procedure> testProcedures = 163 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 164 Assert.assertEquals(1, testProcedures.size()); 165 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 166 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 167 168 // Test splitting wal 169 wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); 170 Assert.assertEquals(1, wals.length); 171 testProcedures = 172 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); 173 Assert.assertEquals(1, testProcedures.size()); 174 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); 175 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); 176 } 177 178 @Test 179 public void testAcquireAndReleaseSplitWALWorker() throws Exception { 180 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); 181 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 182 for (int i = 0; i < 3; i++) { 183 FakeServerProcedure procedure = 184 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 185 testProcedures.add(procedure); 186 ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, 187 HConstants.NO_NONCE); 188 } 189 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 190 FakeServerProcedure failedProcedure = 191 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 192 ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, 193 HConstants.NO_NONCE); 194 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 195 Assert.assertFalse(failedProcedure.isWorkerAcquired()); 196 // let one procedure finish and release worker 197 testProcedures.get(0).countDown(); 198 TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); 199 Assert.assertTrue(testProcedures.get(0).isSuccess()); 200 } 201 202 @Test 203 public void testGetWALsToSplit() throws Exception { 204 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); 205 // load table 206 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); 207 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 208 List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); 209 Assert.assertEquals(1, metaWals.size()); 210 List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); 211 Assert.assertEquals(1, wals.size()); 212 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 213 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 214 .get(); 215 metaWals = splitWALManager.getWALsToSplit(testServer, true); 216 Assert.assertEquals(0, metaWals.size()); 217 } 218 219 private void splitLogsTestHelper(HBaseTestingUtility testUtil) throws Exception { 220 HMaster hmaster = testUtil.getHBaseCluster().getMaster(); 221 SplitWALManager splitWALManager = hmaster.getSplitWALManager(); 222 LOG.info( 223 "The Master FS is pointing to: " + hmaster.getMasterFileSystem().getFileSystem().getUri()); 224 LOG.info( 225 "The WAL FS is pointing to: " + hmaster.getMasterFileSystem().getWALFileSystem().getUri()); 226 227 testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE); 228 // load table 229 testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY); 230 ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor(); 231 ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta(); 232 ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream() 233 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() 234 .get(); 235 List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); 236 Assert.assertEquals(1, procedures.size()); 237 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 238 Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); 239 240 // Validate the old WAL file archive dir 241 Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir(); 242 Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 243 FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem(); 244 int archiveFileCount = walFS.listStatus(walArchivePath).length; 245 246 procedures = splitWALManager.splitWALs(metaServer, true); 247 Assert.assertEquals(1, procedures.size()); 248 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); 249 Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); 250 Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); 251 // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish 252 Assert.assertEquals("Splitted WAL files should be archived", archiveFileCount + 1, 253 walFS.listStatus(walArchivePath).length); 254 } 255 256 @Test 257 public void testSplitLogs() throws Exception { 258 splitLogsTestHelper(TEST_UTIL); 259 } 260 261 @Test 262 public void testSplitLogsWithDifferentWalAndRootFS() throws Exception { 263 HBaseTestingUtility testUtil2 = new HBaseTestingUtility(); 264 testUtil2.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); 265 testUtil2.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); 266 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir"); 267 testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString()); 268 CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir); 269 testUtil2.startMiniCluster(3); 270 splitLogsTestHelper(testUtil2); 271 testUtil2.shutdownMiniCluster(); 272 } 273 274 @Test 275 public void testWorkerReloadWhenMasterRestart() throws Exception { 276 List<FakeServerProcedure> testProcedures = new ArrayList<>(); 277 for (int i = 0; i < 3; i++) { 278 FakeServerProcedure procedure = 279 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); 280 testProcedures.add(procedure); 281 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, 282 HConstants.NO_NONCE, HConstants.NO_NONCE); 283 } 284 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); 285 // Kill master 286 TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); 287 TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); 288 // restart master 289 TEST_UTIL.getHBaseCluster().startMaster(); 290 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 291 this.master = TEST_UTIL.getHBaseCluster().getMaster(); 292 293 FakeServerProcedure failedProcedure = 294 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); 295 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, 296 HConstants.NO_NONCE, HConstants.NO_NONCE); 297 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); 298 Assert.assertFalse(failedProcedure.isWorkerAcquired()); 299 for (int i = 0; i < 3; i++) { 300 testProcedures.get(i).countDown(); 301 } 302 failedProcedure.countDown(); 303 } 304 305 public static final class FakeServerProcedure 306 extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> 307 implements ServerProcedureInterface { 308 309 private ServerName serverName; 310 private ServerName worker; 311 private CountDownLatch barrier = new CountDownLatch(1); 312 private boolean triedToAcquire = false; 313 314 public FakeServerProcedure() { 315 } 316 317 public FakeServerProcedure(ServerName serverName) { 318 this.serverName = serverName; 319 } 320 321 public ServerName getServerName() { 322 return serverName; 323 } 324 325 @Override 326 public boolean hasMetaTableRegion() { 327 return false; 328 } 329 330 @Override 331 public ServerOperationType getServerOperationType() { 332 return SPLIT_WAL; 333 } 334 335 @Override 336 protected Flow executeFromState(MasterProcedureEnv env, 337 MasterProcedureProtos.SplitWALState state) 338 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 339 SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); 340 switch (state) { 341 case ACQUIRE_SPLIT_WAL_WORKER: 342 triedToAcquire = true; 343 worker = splitWALManager.acquireSplitWALWorker(this); 344 setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); 345 return Flow.HAS_MORE_STATE; 346 case DISPATCH_WAL_TO_WORKER: 347 barrier.await(); 348 setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); 349 return Flow.HAS_MORE_STATE; 350 case RELEASE_SPLIT_WORKER: 351 splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); 352 return Flow.NO_MORE_STATE; 353 default: 354 throw new UnsupportedOperationException("unhandled state=" + state); 355 } 356 } 357 358 public boolean isWorkerAcquired() { 359 return worker != null; 360 } 361 362 public boolean isTriedToAcquire() { 363 return triedToAcquire; 364 } 365 366 public void countDown() { 367 this.barrier.countDown(); 368 } 369 370 @Override 371 protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) 372 throws IOException, InterruptedException { 373 374 } 375 376 @Override 377 protected MasterProcedureProtos.SplitWALState getState(int stateId) { 378 return MasterProcedureProtos.SplitWALState.forNumber(stateId); 379 } 380 381 @Override 382 protected int getStateId(MasterProcedureProtos.SplitWALState state) { 383 return state.getNumber(); 384 } 385 386 @Override 387 protected MasterProcedureProtos.SplitWALState getInitialState() { 388 return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; 389 } 390 391 @Override 392 protected boolean holdLock(MasterProcedureEnv env) { 393 return true; 394 } 395 396 @Override 397 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 398 399 } 400 401 @Override 402 protected boolean abort(MasterProcedureEnv env) { 403 return false; 404 } 405 406 @Override 407 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 408 MasterProcedureProtos.SplitWALData.Builder builder = 409 MasterProcedureProtos.SplitWALData.newBuilder(); 410 builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); 411 serializer.serialize(builder.build()); 412 } 413 414 @Override 415 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 416 MasterProcedureProtos.SplitWALData data = 417 serializer.deserialize(MasterProcedureProtos.SplitWALData.class); 418 serverName = ProtobufUtil.toServerName(data.getCrashedServer()); 419 } 420 } 421}