001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.locking; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.core.StringStartsWith.startsWith; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertThrows; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.NamespaceDescriptor; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.locking.LockServiceClient; 041import org.apache.hadoop.hbase.master.MasterRpcServices; 042import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 043import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 044import org.apache.hadoop.hbase.procedure2.LockType; 045import org.apache.hadoop.hbase.procedure2.Procedure; 046import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 047import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.MasterTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.junit.jupiter.api.AfterAll; 053import org.junit.jupiter.api.AfterEach; 054import org.junit.jupiter.api.BeforeAll; 055import org.junit.jupiter.api.BeforeEach; 056import org.junit.jupiter.api.Tag; 057import org.junit.jupiter.api.Test; 058import org.junit.jupiter.api.TestInfo; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 063 064import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; 069 070@Tag(MasterTests.TAG) 071@Tag(LargeTests.TAG) 072public class TestLockProcedure { 073 074 // crank this up if this test turns out to be flaky. 075 private static final int HEARTBEAT_TIMEOUT = 2000; 076 private static final int LOCAL_LOCKS_TIMEOUT = 4000; 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestLockProcedure.class); 079 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 080 private static MasterRpcServices masterRpcService; 081 private static ProcedureExecutor<MasterProcedureEnv> procExec; 082 083 private static String namespace = "namespace"; 084 private static TableName tableName1 = TableName.valueOf(namespace, "table1"); 085 private static List<RegionInfo> tableRegions1; 086 private static TableName tableName2 = TableName.valueOf(namespace, "table2"); 087 private static List<RegionInfo> tableRegions2; 088 089 private String testMethodName; 090 091 private static void setupConf(Configuration conf) { 092 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); 093 conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null 094 conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT); 095 conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT); 096 } 097 098 @BeforeAll 099 public static void setupCluster() throws Exception { 100 setupConf(UTIL.getConfiguration()); 101 UTIL.startMiniCluster(1); 102 UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); 103 UTIL.createTable(tableName1, new byte[][] { Bytes.toBytes("fam") }, 104 new byte[][] { Bytes.toBytes("1") }); 105 UTIL.createTable(tableName2, new byte[][] { Bytes.toBytes("fam") }, 106 new byte[][] { Bytes.toBytes("1") }); 107 masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices(); 108 procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 109 tableRegions1 = UTIL.getAdmin().getRegions(tableName1); 110 tableRegions2 = UTIL.getAdmin().getRegions(tableName2); 111 assert tableRegions1.size() > 0; 112 assert tableRegions2.size() > 0; 113 } 114 115 @AfterAll 116 public static void cleanupTest() throws Exception { 117 try { 118 UTIL.shutdownMiniCluster(); 119 } catch (Exception e) { 120 LOG.warn("failure shutting down cluster", e); 121 } 122 } 123 124 @BeforeEach 125 public void setup(TestInfo testInfo) throws Exception { 126 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 127 testMethodName = testInfo.getTestMethod().get().getName(); 128 } 129 130 @AfterEach 131 public void tearDown() throws Exception { 132 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 133 // Kill all running procedures. 134 for (Procedure<?> proc : procExec.getProcedures()) { 135 procExec.abort(proc.getProcId()); 136 ProcedureTestingUtility.waitProcedure(procExec, proc); 137 } 138 assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size()); 139 } 140 141 private LockRequest getNamespaceLock(String namespace, String description) { 142 return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, namespace, null, 143 null, description, HConstants.NO_NONCE, HConstants.NO_NONCE); 144 } 145 146 private LockRequest getTableExclusiveLock(TableName tableName, String description) { 147 return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, null, tableName, 148 null, description, HConstants.NO_NONCE, HConstants.NO_NONCE); 149 } 150 151 private LockRequest getRegionLock(List<RegionInfo> regionInfos, String description) { 152 return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, null, null, 153 regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE); 154 } 155 156 private void validateLockRequestException(LockRequest lockRequest, String message) 157 throws Exception { 158 ServiceException serviceException = 159 assertThrows(ServiceException.class, () -> masterRpcService.requestLock(null, lockRequest)); 160 assertThat(serviceException.getCause(), instanceOf(DoNotRetryIOException.class)); 161 assertThat(serviceException.getMessage(), 162 startsWith( 163 "org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalArgumentException: " 164 + message)); 165 } 166 167 @Test 168 public void testLockRequestValidationEmptyDescription() throws Exception { 169 validateLockRequestException(getNamespaceLock("", ""), "Empty description"); 170 } 171 172 @Test 173 public void testLockRequestValidationEmptyNamespaceName() throws Exception { 174 validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace"); 175 } 176 177 @Test 178 public void testLockRequestValidationRegionsFromDifferentTable() throws Exception { 179 List<RegionInfo> regions = new ArrayList<>(); 180 regions.addAll(tableRegions1); 181 regions.addAll(tableRegions2); 182 validateLockRequestException(getRegionLock(regions, "desc"), 183 "All regions should be from same table"); 184 } 185 186 /** 187 * Returns immediately if the lock is acquired. 188 * @throws TimeoutException if lock couldn't be acquired. 189 */ 190 private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception { 191 long deadline = EnvironmentEdgeManager.currentTime() + timeoutInMs; 192 while (EnvironmentEdgeManager.currentTime() < deadline) { 193 LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null, 194 LockHeartbeatRequest.newBuilder().setProcId(procId).build()); 195 if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) { 196 assertEquals(HEARTBEAT_TIMEOUT, response.getTimeoutMs()); 197 LOG.debug(String.format("Proc id %s acquired lock.", procId)); 198 return true; 199 } 200 Thread.sleep(100); 201 } 202 return false; 203 } 204 205 private long queueLock(LockRequest lockRequest) throws ServiceException { 206 LockResponse response = masterRpcService.requestLock(null, lockRequest); 207 return response.getProcId(); 208 } 209 210 private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException { 211 LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null, 212 LockHeartbeatRequest.newBuilder().setProcId(procId).build()); 213 if (isLocked) { 214 assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus()); 215 } else { 216 assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus()); 217 } 218 LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus())); 219 } 220 221 private void releaseLock(long procId) throws ServiceException { 222 masterRpcService.lockHeartbeat(null, 223 LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build()); 224 } 225 226 @Test 227 public void testUpdateHeartbeatAndUnlockForTable() throws Exception { 228 LockRequest lock = getTableExclusiveLock(tableName1, testMethodName); 229 final long procId = queueLock(lock); 230 assertTrue(awaitForLocked(procId, 2000)); 231 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 232 sendHeartbeatAndCheckLocked(procId, true); 233 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 234 sendHeartbeatAndCheckLocked(procId, true); 235 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 236 sendHeartbeatAndCheckLocked(procId, true); 237 releaseLock(procId); 238 sendHeartbeatAndCheckLocked(procId, false); 239 ProcedureTestingUtility.waitProcedure(procExec, procId); 240 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 241 } 242 243 @Test 244 public void testAbort() throws Exception { 245 LockRequest lock = getTableExclusiveLock(tableName1, testMethodName); 246 final long procId = queueLock(lock); 247 assertTrue(awaitForLocked(procId, 2000)); 248 assertTrue(procExec.abort(procId)); 249 sendHeartbeatAndCheckLocked(procId, false); 250 ProcedureTestingUtility.waitProcedure(procExec, procId); 251 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 252 } 253 254 @Test 255 public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception { 256 LockRequest lock = getNamespaceLock(namespace, testMethodName); 257 final long procId = queueLock(lock); 258 assertTrue(awaitForLocked(procId, 2000)); 259 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 260 sendHeartbeatAndCheckLocked(procId, true); 261 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 262 sendHeartbeatAndCheckLocked(procId, true); 263 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 264 sendHeartbeatAndCheckLocked(procId, true); 265 releaseLock(procId); 266 sendHeartbeatAndCheckLocked(procId, false); 267 ProcedureTestingUtility.waitProcedure(procExec, procId); 268 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 269 } 270 271 @Test 272 public void testTimeout() throws Exception { 273 LockRequest lock = getNamespaceLock(namespace, testMethodName); 274 final long procId = queueLock(lock); 275 assertTrue(awaitForLocked(procId, 2000)); 276 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 277 sendHeartbeatAndCheckLocked(procId, true); 278 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 279 sendHeartbeatAndCheckLocked(procId, true); 280 Thread.sleep(4 * HEARTBEAT_TIMEOUT); 281 sendHeartbeatAndCheckLocked(procId, false); 282 ProcedureTestingUtility.waitProcedure(procExec, procId); 283 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 284 } 285 286 @Test 287 public void testMultipleLocks() throws Exception { 288 LockRequest nsLock = getNamespaceLock(namespace, testMethodName); 289 LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName); 290 LockRequest tableLock2 = getTableExclusiveLock(tableName2, testMethodName); 291 LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName); 292 LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName); 293 // Acquire namespace lock, then queue other locks. 294 long nsProcId = queueLock(nsLock); 295 assertTrue(awaitForLocked(nsProcId, 2000)); 296 long start = EnvironmentEdgeManager.currentTime(); 297 sendHeartbeatAndCheckLocked(nsProcId, true); 298 long table1ProcId = queueLock(tableLock1); 299 long table2ProcId = queueLock(tableLock2); 300 long regions1ProcId = queueLock(regionsLock1); 301 long regions2ProcId = queueLock(regionsLock2); 302 303 // Assert tables & region locks are waiting because of namespace lock. 304 long now = EnvironmentEdgeManager.currentTime(); 305 // leave extra 10 msec in case more than half the HEARTBEAT_TIMEOUT has passed 306 Thread 307 .sleep(Math.min(HEARTBEAT_TIMEOUT / 2, Math.max(HEARTBEAT_TIMEOUT - (now - start) - 10, 0))); 308 sendHeartbeatAndCheckLocked(nsProcId, true); 309 sendHeartbeatAndCheckLocked(table1ProcId, false); 310 sendHeartbeatAndCheckLocked(table2ProcId, false); 311 sendHeartbeatAndCheckLocked(regions1ProcId, false); 312 sendHeartbeatAndCheckLocked(regions2ProcId, false); 313 314 // Release namespace lock and assert tables locks are acquired but not region lock 315 releaseLock(nsProcId); 316 assertTrue(awaitForLocked(table1ProcId, 2000)); 317 assertTrue(awaitForLocked(table2ProcId, 2000)); 318 sendHeartbeatAndCheckLocked(regions1ProcId, false); 319 sendHeartbeatAndCheckLocked(regions2ProcId, false); 320 321 // Release table1 lock and assert region lock is acquired. 322 releaseLock(table1ProcId); 323 sendHeartbeatAndCheckLocked(table1ProcId, false); 324 assertTrue(awaitForLocked(regions1ProcId, 2000)); 325 sendHeartbeatAndCheckLocked(table2ProcId, true); 326 sendHeartbeatAndCheckLocked(regions2ProcId, false); 327 328 // Release table2 lock and assert region lock is acquired. 329 releaseLock(table2ProcId); 330 sendHeartbeatAndCheckLocked(table2ProcId, false); 331 assertTrue(awaitForLocked(regions2ProcId, 2000)); 332 sendHeartbeatAndCheckLocked(regions1ProcId, true); 333 sendHeartbeatAndCheckLocked(regions2ProcId, true); 334 335 // Release region locks. 336 releaseLock(regions1ProcId); 337 releaseLock(regions2ProcId); 338 sendHeartbeatAndCheckLocked(regions1ProcId, false); 339 sendHeartbeatAndCheckLocked(regions2ProcId, false); 340 ProcedureTestingUtility.waitAllProcedures(procExec); 341 ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId); 342 ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId); 343 ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId); 344 ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId); 345 ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId); 346 } 347 348 // Test latch is decreased in count when lock is acquired. 349 @Test 350 public void testLatch() throws Exception { 351 CountDownLatch latch = new CountDownLatch(1); 352 // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly. 353 LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"), 354 org.apache.hadoop.hbase.procedure2.LockType.EXCLUSIVE, "desc", latch); 355 procExec.submitProcedure(lockProc); 356 assertTrue(latch.await(2000, TimeUnit.MILLISECONDS)); 357 releaseLock(lockProc.getProcId()); 358 ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId()); 359 ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId()); 360 } 361 362 // LockProcedures with latch are considered local locks. 363 @Test 364 public void testLocalLockTimeout() throws Exception { 365 CountDownLatch latch = new CountDownLatch(1); 366 // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly. 367 LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"), 368 LockType.EXCLUSIVE, "desc", latch); 369 procExec.submitProcedure(lockProc); 370 assertTrue(awaitForLocked(lockProc.getProcId(), 2000)); 371 Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2); 372 assertTrue(lockProc.isLocked()); 373 Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT); 374 assertFalse(lockProc.isLocked()); 375 releaseLock(lockProc.getProcId()); 376 ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId()); 377 ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId()); 378 } 379 380 private void testRemoteLockRecovery(LockRequest lock) throws Exception { 381 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); 382 final long procId = queueLock(lock); 383 assertTrue(awaitForLocked(procId, 2000)); 384 385 // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started. 386 ProcedureTestingUtility.waitProcedure(procExec, procId); 387 assertEquals(false, procExec.isRunning()); 388 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 389 ProcedureTestingUtility.restart(procExec); 390 while (!procExec.isStarted(procId)) { 391 Thread.sleep(250); 392 } 393 assertEquals(true, procExec.isRunning()); 394 395 // After recovery, remote locks should reacquire locks and function normally. 396 assertTrue(awaitForLocked(procId, 2000)); 397 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 398 sendHeartbeatAndCheckLocked(procId, true); 399 Thread.sleep(HEARTBEAT_TIMEOUT / 2); 400 sendHeartbeatAndCheckLocked(procId, true); 401 Thread.sleep(2 * HEARTBEAT_TIMEOUT + HEARTBEAT_TIMEOUT / 2); 402 sendHeartbeatAndCheckLocked(procId, false); 403 ProcedureTestingUtility.waitProcedure(procExec, procId); 404 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 405 } 406 407 @Test 408 public void testRemoteTableLockRecovery() throws Exception { 409 LockRequest lock = getTableExclusiveLock(tableName1, testMethodName); 410 testRemoteLockRecovery(lock); 411 } 412 413 @Test 414 public void testRemoteNamespaceLockRecovery() throws Exception { 415 LockRequest lock = getNamespaceLock(namespace, testMethodName); 416 testRemoteLockRecovery(lock); 417 } 418 419 @Test 420 public void testRemoteRegionLockRecovery() throws Exception { 421 LockRequest lock = getRegionLock(tableRegions1, testMethodName); 422 testRemoteLockRecovery(lock); 423 } 424 425 @Test 426 public void testLocalMasterLockRecovery() throws Exception { 427 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); 428 CountDownLatch latch = new CountDownLatch(1); 429 LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"), 430 LockType.EXCLUSIVE, "desc", latch); 431 procExec.submitProcedure(lockProc); 432 assertTrue(latch.await(2000, TimeUnit.MILLISECONDS)); 433 434 // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started. 435 ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId()); 436 assertEquals(false, procExec.isRunning()); 437 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 438 // remove zk lock node otherwise recovered lock will keep waiting on it. 439 ProcedureTestingUtility.restart(procExec); 440 while (!procExec.isStarted(lockProc.getProcId())) { 441 Thread.sleep(250); 442 } 443 assertEquals(true, procExec.isRunning()); 444 ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId()); 445 Procedure<?> result = procExec.getResultOrProcedure(lockProc.getProcId()); 446 assertTrue(result != null && !result.isFailed()); 447 ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId()); 448 } 449}