001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.locking;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.ArgumentMatchers.eq;
025import static org.mockito.ArgumentMatchers.isA;
026import static org.mockito.Mockito.atLeastOnce;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseIOException;
037import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.Threads;
042import org.junit.jupiter.api.BeforeEach;
043import org.junit.jupiter.api.Tag;
044import org.junit.jupiter.api.Test;
045import org.mockito.ArgumentCaptor;
046import org.mockito.Mockito;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
051
052import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType;
058
059@Tag(ClientTests.TAG)
060@Tag(SmallTests.TAG)
061public class TestEntityLocks {
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestEntityLocks.class);
064
065  private final Configuration conf = HBaseConfiguration.create();
066
067  private final LockService.BlockingInterface master =
068    Mockito.mock(LockService.BlockingInterface.class);
069
070  private LockServiceClient admin;
071  private ArgumentCaptor<LockRequest> lockReqArgCaptor;
072  private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor;
073
074  private static final LockHeartbeatResponse UNLOCKED_RESPONSE = LockHeartbeatResponse.newBuilder()
075    .setLockStatus(LockHeartbeatResponse.LockStatus.UNLOCKED).build();
076  // timeout such that worker thread waits for 500ms for each heartbeat.
077  private static final LockHeartbeatResponse LOCKED_RESPONSE = LockHeartbeatResponse.newBuilder()
078    .setLockStatus(LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build();
079  private long procId;
080
081  // Setup mock admin.
082  LockServiceClient getAdmin() throws Exception {
083    conf.setInt("hbase.client.retries.number", 3);
084    conf.setInt("hbase.client.pause", 1); // 1ms. Immediately retry rpc on failure.
085    return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get());
086  }
087
088  @BeforeEach
089  public void setUp() throws Exception {
090    admin = getAdmin();
091    lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class);
092    lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class);
093    procId = ThreadLocalRandom.current().nextLong();
094  }
095
096  private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) {
097    long startMillis = EnvironmentEdgeManager.currentTime();
098    while (lock.isLocked()) {
099      LOG.info("Sleeping...");
100      Threads.sleepWithoutInterrupt(100);
101      if (!lock.isLocked()) {
102        return true;
103      }
104      if (EnvironmentEdgeManager.currentTime() - startMillis > maxWaitTimeMillis) {
105        LOG.info("Timedout...");
106        return false;
107      }
108    }
109    return true; // to make compiler happy.
110  }
111
112  /**
113   * Test basic lock function - requestLock, await, unlock.
114   */
115  @Test
116  public void testEntityLock() throws Exception {
117    final long procId = 100;
118    final long workerSleepTime = 200; // in ms
119    EntityLock lock = admin.namespaceLock("namespace", "description", null);
120    lock.setTestingSleepTime(workerSleepTime);
121
122    when(master.requestLock(any(), any()))
123      .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
124    when(master.lockHeartbeat(any(), any())).thenReturn(UNLOCKED_RESPONSE, UNLOCKED_RESPONSE,
125      UNLOCKED_RESPONSE, LOCKED_RESPONSE);
126
127    lock.requestLock();
128    // we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime
129    lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS);
130    assertTrue(lock.isLocked());
131    lock.unlock();
132    assertTrue(!lock.getWorker().isAlive());
133    assertFalse(lock.isLocked());
134
135    // check LockRequest in requestLock()
136    verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture());
137    LockRequest request = lockReqArgCaptor.getValue();
138    assertEquals("namespace", request.getNamespace());
139    assertEquals("description", request.getDescription());
140    assertEquals(LockType.EXCLUSIVE, request.getLockType());
141    assertEquals(0, request.getRegionInfoCount());
142
143    // check LockHeartbeatRequest in lockHeartbeat()
144    verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture());
145    for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) {
146      assertEquals(procId, req.getProcId());
147    }
148  }
149
150  /**
151   * Test that abort is called when lock times out.
152   */
153  @Test
154  public void testEntityLockTimeout() throws Exception {
155    final long workerSleepTime = 200; // in ms
156    Abortable abortable = Mockito.mock(Abortable.class);
157    EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
158    lock.setTestingSleepTime(workerSleepTime);
159
160    when(master.requestLock(any(), any()))
161      .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
162    // Acquires the lock, but then it times out (since we don't call unlock() on it).
163    when(master.lockHeartbeat(any(), any())).thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE);
164
165    lock.requestLock();
166    lock.await();
167    assertTrue(lock.isLocked());
168    // Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure.
169    assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime));
170
171    // Works' run() returns, there is a small gap that the thread is still alive(os
172    // has not declare it is dead yet), so remove the following assertion.
173    // assertFalse(lock.getWorker().isAlive());
174    verify(abortable, times(1)).abort(any(), eq(null));
175  }
176
177  /**
178   * Test that abort is called when lockHeartbeat fails with IOException.
179   */
180  @Test
181  public void testHeartbeatException() throws Exception {
182    final long workerSleepTime = 100; // in ms
183    Abortable abortable = Mockito.mock(Abortable.class);
184    EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
185    lock.setTestingSleepTime(workerSleepTime);
186
187    when(master.requestLock(any(), any()))
188      .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
189    when(master.lockHeartbeat(any(), any())).thenReturn(LOCKED_RESPONSE)
190      .thenThrow(new ServiceException("Failed heartbeat!"));
191
192    lock.requestLock();
193    lock.await();
194    assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime));
195    while (lock.getWorker().isAlive()) {
196      TimeUnit.MILLISECONDS.sleep(100);
197    }
198    verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class));
199    assertFalse(lock.getWorker().isAlive());
200  }
201}