001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.locking; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Matchers.any; 024import static org.mockito.Matchers.eq; 025import static org.mockito.Matchers.isA; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.util.Random; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.apache.hadoop.hbase.util.Threads; 042import org.junit.Before; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.mockito.ArgumentCaptor; 047import org.mockito.Mockito; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType; 059 060@Category({ClientTests.class, SmallTests.class}) 061public class TestEntityLocks { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestEntityLocks.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestEntityLocks.class); 068 069 private final Configuration conf = HBaseConfiguration.create(); 070 071 private final LockService.BlockingInterface master = 072 Mockito.mock(LockService.BlockingInterface.class); 073 074 private LockServiceClient admin; 075 private ArgumentCaptor<LockRequest> lockReqArgCaptor; 076 private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor; 077 078 private static final LockHeartbeatResponse UNLOCKED_RESPONSE = 079 LockHeartbeatResponse.newBuilder().setLockStatus( 080 LockHeartbeatResponse.LockStatus.UNLOCKED).build(); 081 // timeout such that worker thread waits for 500ms for each heartbeat. 082 private static final LockHeartbeatResponse LOCKED_RESPONSE = 083 LockHeartbeatResponse.newBuilder().setLockStatus( 084 LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build(); 085 private long procId; 086 087 // Setup mock admin. 088 LockServiceClient getAdmin() throws Exception { 089 conf.setInt("hbase.client.retries.number", 3); 090 conf.setInt("hbase.client.pause", 1); // 1ms. Immediately retry rpc on failure. 091 return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get()); 092 } 093 094 @Before 095 public void setUp() throws Exception { 096 admin = getAdmin(); 097 lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class); 098 lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class); 099 procId = new Random().nextLong(); 100 } 101 102 private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) { 103 long startMillis = System.currentTimeMillis(); 104 while (lock.isLocked()) { 105 LOG.info("Sleeping..."); 106 Threads.sleepWithoutInterrupt(100); 107 if (!lock.isLocked()) { 108 return true; 109 } 110 if (System.currentTimeMillis() - startMillis > maxWaitTimeMillis) { 111 LOG.info("Timedout..."); 112 return false; 113 } 114 } 115 return true; // to make compiler happy. 116 } 117 118 /** 119 * Test basic lock function - requestLock, await, unlock. 120 * @throws Exception 121 */ 122 @Test 123 public void testEntityLock() throws Exception { 124 final long procId = 100; 125 final long workerSleepTime = 200; // in ms 126 EntityLock lock = admin.namespaceLock("namespace", "description", null); 127 lock.setTestingSleepTime(workerSleepTime); 128 129 when(master.requestLock(any(), any())).thenReturn( 130 LockResponse.newBuilder().setProcId(procId).build()); 131 when(master.lockHeartbeat(any(), any())).thenReturn( 132 UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, LOCKED_RESPONSE); 133 134 lock.requestLock(); 135 // we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime 136 lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS); 137 assertTrue(lock.isLocked()); 138 lock.unlock(); 139 assertTrue(!lock.getWorker().isAlive()); 140 assertFalse(lock.isLocked()); 141 142 // check LockRequest in requestLock() 143 verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture()); 144 LockRequest request = lockReqArgCaptor.getValue(); 145 assertEquals("namespace", request.getNamespace()); 146 assertEquals("description", request.getDescription()); 147 assertEquals(LockType.EXCLUSIVE, request.getLockType()); 148 assertEquals(0, request.getRegionInfoCount()); 149 150 // check LockHeartbeatRequest in lockHeartbeat() 151 verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture()); 152 for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) { 153 assertEquals(procId, req.getProcId()); 154 } 155 } 156 157 /** 158 * Test that abort is called when lock times out. 159 */ 160 @Test 161 public void testEntityLockTimeout() throws Exception { 162 final long workerSleepTime = 200; // in ms 163 Abortable abortable = Mockito.mock(Abortable.class); 164 EntityLock lock = admin.namespaceLock("namespace", "description", abortable); 165 lock.setTestingSleepTime(workerSleepTime); 166 167 when(master.requestLock(any(), any())) 168 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 169 // Acquires the lock, but then it times out (since we don't call unlock() on it). 170 when(master.lockHeartbeat(any(), any())) 171 .thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE); 172 173 lock.requestLock(); 174 lock.await(); 175 assertTrue(lock.isLocked()); 176 // Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure. 177 assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime)); 178 179 // Works' run() returns, there is a small gap that the thread is still alive(os 180 // has not declare it is dead yet), so remove the following assertion. 181 // assertFalse(lock.getWorker().isAlive()); 182 verify(abortable, times(1)).abort(any(), eq(null)); 183 } 184 185 /** 186 * Test that abort is called when lockHeartbeat fails with IOException. 187 */ 188 @Test 189 public void testHeartbeatException() throws Exception { 190 final long workerSleepTime = 100; // in ms 191 Abortable abortable = Mockito.mock(Abortable.class); 192 EntityLock lock = admin.namespaceLock("namespace", "description", abortable); 193 lock.setTestingSleepTime(workerSleepTime); 194 195 when(master.requestLock(any(), any())) 196 .thenReturn(LockResponse.newBuilder().setProcId(procId).build()); 197 when(master.lockHeartbeat(any(), any())) 198 .thenReturn(LOCKED_RESPONSE) 199 .thenThrow(new ServiceException("Failed heartbeat!")); 200 201 lock.requestLock(); 202 lock.await(); 203 assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime)); 204 while (lock.getWorker().isAlive()) { 205 TimeUnit.MILLISECONDS.sleep(100); 206 } 207 verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class)); 208 assertFalse(lock.getWorker().isAlive()); 209 } 210}