001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.locking; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.any; 024import static org.mockito.Matchers.eq; 025import static org.mockito.Matchers.isA; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.util.Threads; 043import org.junit.Before; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.mockito.ArgumentCaptor; 048import org.mockito.Mockito; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType; 060 061@Category({ ClientTests.class, SmallTests.class }) 062public class TestEntityLocks { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestEntityLocks.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestEntityLocks.class); 069 070 private final Configuration conf = HBaseConfiguration.create(); 071 072 private final LockService.BlockingInterface master = 073 Mockito.mock(LockService.BlockingInterface.class); 074 075 private LockServiceClient admin; 076 private ArgumentCaptor<LockRequest> lockReqArgCaptor; 077 private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor; 078 079 private static final LockHeartbeatResponse UNLOCKED_RESPONSE = LockHeartbeatResponse.newBuilder() 080 .setLockStatus(LockHeartbeatResponse.LockStatus.UNLOCKED).build(); 081 // timeout such that worker thread waits for 500ms for each heartbeat. 082 private static final LockHeartbeatResponse LOCKED_RESPONSE = LockHeartbeatResponse.newBuilder() 083 .setLockStatus(LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build(); 084 private long procId; 085 086 // Setup mock admin. 087 LockServiceClient getAdmin() throws Exception { 088 conf.setInt("hbase.client.retries.number", 3); 089 conf.setInt("hbase.client.pause", 1); // 1ms. Immediately retry rpc on failure. 090 return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get()); 091 } 092 093 @Before 094 public void setUp() throws Exception { 095 admin = getAdmin(); 096 lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class); 097 lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class); 098 procId = ThreadLocalRandom.current().nextLong(); 099 } 100 101 private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) { 102 long startMillis = EnvironmentEdgeManager.currentTime(); 103 while (lock.isLocked()) { 104 LOG.info("Sleeping..."); 105 Threads.sleepWithoutInterrupt(100); 106 if (!lock.isLocked()) { 107 return true; 108 } 109 if (EnvironmentEdgeManager.currentTime() - startMillis > maxWaitTimeMillis) { 110 LOG.info("Timedout..."); 111 return false; 112 } 113 } 114 return true; // to make compiler happy. 115 } 116 117 /** 118 * Test basic lock function - requestLock, await, unlock. n 119 */ 120 @Test 121 public void testEntityLock() throws Exception { 122 final long procId = 100; 123 final long workerSleepTime = 200; // in ms 124 EntityLock lock = admin.namespaceLock("namespace", "description", null); 125 lock.setTestingSleepTime(workerSleepTime); 126 127 when(master.requestLock(any(), any())) 128 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 129 when(master.lockHeartbeat(any(), any())).thenReturn(UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, 130 UNLOCKED_RESPONSE, LOCKED_RESPONSE); 131 132 lock.requestLock(); 133 // we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime 134 lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS); 135 assertTrue(lock.isLocked()); 136 lock.unlock(); 137 assertTrue(!lock.getWorker().isAlive()); 138 assertFalse(lock.isLocked()); 139 140 // check LockRequest in requestLock() 141 verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture()); 142 LockRequest request = lockReqArgCaptor.getValue(); 143 assertEquals("namespace", request.getNamespace()); 144 assertEquals("description", request.getDescription()); 145 assertEquals(LockType.EXCLUSIVE, request.getLockType()); 146 assertEquals(0, request.getRegionInfoCount()); 147 148 // check LockHeartbeatRequest in lockHeartbeat() 149 verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture()); 150 for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) { 151 assertEquals(procId, req.getProcId()); 152 } 153 } 154 155 /** 156 * Test that abort is called when lock times out. 157 */ 158 @Test 159 public void testEntityLockTimeout() throws Exception { 160 final long workerSleepTime = 200; // in ms 161 Abortable abortable = Mockito.mock(Abortable.class); 162 EntityLock lock = admin.namespaceLock("namespace", "description", abortable); 163 lock.setTestingSleepTime(workerSleepTime); 164 165 when(master.requestLock(any(), any())) 166 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 167 // Acquires the lock, but then it times out (since we don't call unlock() on it). 168 when(master.lockHeartbeat(any(), any())).thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE); 169 170 lock.requestLock(); 171 lock.await(); 172 assertTrue(lock.isLocked()); 173 // Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure. 174 assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime)); 175 176 // Works' run() returns, there is a small gap that the thread is still alive(os 177 // has not declare it is dead yet), so remove the following assertion. 178 // assertFalse(lock.getWorker().isAlive()); 179 verify(abortable, times(1)).abort(any(), eq(null)); 180 } 181 182 /** 183 * Test that abort is called when lockHeartbeat fails with IOException. 184 */ 185 @Test 186 public void testHeartbeatException() throws Exception { 187 final long workerSleepTime = 100; // in ms 188 Abortable abortable = Mockito.mock(Abortable.class); 189 EntityLock lock = admin.namespaceLock("namespace", "description", abortable); 190 lock.setTestingSleepTime(workerSleepTime); 191 192 when(master.requestLock(any(), any())) 193 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 194 when(master.lockHeartbeat(any(), any())).thenReturn(LOCKED_RESPONSE) 195 .thenThrow(new ServiceException("Failed heartbeat!")); 196 197 lock.requestLock(); 198 lock.await(); 199 assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime)); 200 while (lock.getWorker().isAlive()) { 201 TimeUnit.MILLISECONDS.sleep(100); 202 } 203 verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class)); 204 assertFalse(lock.getWorker().isAlive()); 205 } 206}