001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.locking; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.ArgumentMatchers.eq; 025import static org.mockito.ArgumentMatchers.isA; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseIOException; 037import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.Threads; 042import org.junit.jupiter.api.BeforeEach; 043import org.junit.jupiter.api.Tag; 044import org.junit.jupiter.api.Test; 045import org.mockito.ArgumentCaptor; 046import org.mockito.Mockito; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 051 052import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType; 058 059@Tag(ClientTests.TAG) 060@Tag(SmallTests.TAG) 061public class TestEntityLocks { 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestEntityLocks.class); 064 065 private final Configuration conf = HBaseConfiguration.create(); 066 067 private final LockService.BlockingInterface master = 068 Mockito.mock(LockService.BlockingInterface.class); 069 070 private LockServiceClient admin; 071 private ArgumentCaptor<LockRequest> lockReqArgCaptor; 072 private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor; 073 074 private static final LockHeartbeatResponse UNLOCKED_RESPONSE = LockHeartbeatResponse.newBuilder() 075 .setLockStatus(LockHeartbeatResponse.LockStatus.UNLOCKED).build(); 076 // timeout such that worker thread waits for 500ms for each heartbeat. 077 private static final LockHeartbeatResponse LOCKED_RESPONSE = LockHeartbeatResponse.newBuilder() 078 .setLockStatus(LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build(); 079 private long procId; 080 081 // Setup mock admin. 082 LockServiceClient getAdmin() throws Exception { 083 conf.setInt("hbase.client.retries.number", 3); 084 conf.setInt("hbase.client.pause", 1); // 1ms. Immediately retry rpc on failure. 085 return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get()); 086 } 087 088 @BeforeEach 089 public void setUp() throws Exception { 090 admin = getAdmin(); 091 lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class); 092 lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class); 093 procId = ThreadLocalRandom.current().nextLong(); 094 } 095 096 private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) { 097 long startMillis = EnvironmentEdgeManager.currentTime(); 098 while (lock.isLocked()) { 099 LOG.info("Sleeping..."); 100 Threads.sleepWithoutInterrupt(100); 101 if (!lock.isLocked()) { 102 return true; 103 } 104 if (EnvironmentEdgeManager.currentTime() - startMillis > maxWaitTimeMillis) { 105 LOG.info("Timedout..."); 106 return false; 107 } 108 } 109 return true; // to make compiler happy. 110 } 111 112 /** 113 * Test basic lock function - requestLock, await, unlock. 114 */ 115 @Test 116 public void testEntityLock() throws Exception { 117 final long procId = 100; 118 final long workerSleepTime = 200; // in ms 119 EntityLock lock = admin.namespaceLock("namespace", "description", null); 120 lock.setTestingSleepTime(workerSleepTime); 121 122 when(master.requestLock(any(), any())) 123 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 124 when(master.lockHeartbeat(any(), any())).thenReturn(UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, 125 UNLOCKED_RESPONSE, LOCKED_RESPONSE); 126 127 lock.requestLock(); 128 // we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime 129 lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS); 130 assertTrue(lock.isLocked()); 131 lock.unlock(); 132 assertTrue(!lock.getWorker().isAlive()); 133 assertFalse(lock.isLocked()); 134 135 // check LockRequest in requestLock() 136 verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture()); 137 LockRequest request = lockReqArgCaptor.getValue(); 138 assertEquals("namespace", request.getNamespace()); 139 assertEquals("description", request.getDescription()); 140 assertEquals(LockType.EXCLUSIVE, request.getLockType()); 141 assertEquals(0, request.getRegionInfoCount()); 142 143 // check LockHeartbeatRequest in lockHeartbeat() 144 verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture()); 145 for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) { 146 assertEquals(procId, req.getProcId()); 147 } 148 } 149 150 /** 151 * Test that abort is called when lock times out. 152 */ 153 @Test 154 public void testEntityLockTimeout() throws Exception { 155 final long workerSleepTime = 200; // in ms 156 Abortable abortable = Mockito.mock(Abortable.class); 157 EntityLock lock = admin.namespaceLock("namespace", "description", abortable); 158 lock.setTestingSleepTime(workerSleepTime); 159 160 when(master.requestLock(any(), any())) 161 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 162 // Acquires the lock, but then it times out (since we don't call unlock() on it). 163 when(master.lockHeartbeat(any(), any())).thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE); 164 165 lock.requestLock(); 166 lock.await(); 167 assertTrue(lock.isLocked()); 168 // Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure. 169 assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime)); 170 171 // Works' run() returns, there is a small gap that the thread is still alive(os 172 // has not declare it is dead yet), so remove the following assertion. 173 // assertFalse(lock.getWorker().isAlive()); 174 verify(abortable, times(1)).abort(any(), eq(null)); 175 } 176 177 /** 178 * Test that abort is called when lockHeartbeat fails with IOException. 179 */ 180 @Test 181 public void testHeartbeatException() throws Exception { 182 final long workerSleepTime = 100; // in ms 183 Abortable abortable = Mockito.mock(Abortable.class); 184 EntityLock lock = admin.namespaceLock("namespace", "description", abortable); 185 lock.setTestingSleepTime(workerSleepTime); 186 187 when(master.requestLock(any(), any())) 188 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 189 when(master.lockHeartbeat(any(), any())).thenReturn(LOCKED_RESPONSE) 190 .thenThrow(new ServiceException("Failed heartbeat!")); 191 192 lock.requestLock(); 193 lock.await(); 194 assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime)); 195 while (lock.getWorker().isAlive()) { 196 TimeUnit.MILLISECONDS.sleep(100); 197 } 198 verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class)); 199 assertFalse(lock.getWorker().isAlive()); 200 } 201}