001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.locking;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Matchers.any;
024import static org.mockito.Matchers.eq;
025import static org.mockito.Matchers.isA;
026import static org.mockito.Mockito.atLeastOnce;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.util.Random;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Threads;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.mockito.ArgumentCaptor;
047import org.mockito.Mockito;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType;
059
060@Category({ClientTests.class, SmallTests.class})
061public class TestEntityLocks {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestEntityLocks.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestEntityLocks.class);
068
069  private final Configuration conf = HBaseConfiguration.create();
070
071  private final LockService.BlockingInterface master =
072    Mockito.mock(LockService.BlockingInterface.class);
073
074  private LockServiceClient admin;
075  private ArgumentCaptor<LockRequest> lockReqArgCaptor;
076  private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor;
077
078  private static final LockHeartbeatResponse UNLOCKED_RESPONSE =
079      LockHeartbeatResponse.newBuilder().setLockStatus(
080          LockHeartbeatResponse.LockStatus.UNLOCKED).build();
081  // timeout such that worker thread waits for 500ms for each heartbeat.
082  private static final LockHeartbeatResponse LOCKED_RESPONSE =
083      LockHeartbeatResponse.newBuilder().setLockStatus(
084          LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build();
085  private long procId;
086
087  // Setup mock admin.
088  LockServiceClient getAdmin() throws Exception {
089    conf.setInt("hbase.client.retries.number", 3);
090    conf.setInt("hbase.client.pause", 1);  // 1ms. Immediately retry rpc on failure.
091    return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get());
092  }
093
094  @Before
095  public void setUp() throws Exception {
096    admin = getAdmin();
097    lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class);
098    lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class);
099    procId = new Random().nextLong();
100  }
101
102  private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) {
103    long startMillis = System.currentTimeMillis();
104    while (lock.isLocked()) {
105      LOG.info("Sleeping...");
106      Threads.sleepWithoutInterrupt(100);
107      if (!lock.isLocked()) {
108        return true;
109      }
110      if (System.currentTimeMillis() - startMillis > maxWaitTimeMillis) {
111        LOG.info("Timedout...");
112        return false;
113      }
114    }
115    return true; // to make compiler happy.
116  }
117
118  /**
119   * Test basic lock function - requestLock, await, unlock.
120   * @throws Exception
121   */
122  @Test
123  public void testEntityLock() throws Exception {
124    final long procId = 100;
125    final long workerSleepTime = 200;  // in ms
126    EntityLock lock = admin.namespaceLock("namespace", "description", null);
127    lock.setTestingSleepTime(workerSleepTime);
128
129    when(master.requestLock(any(), any())).thenReturn(
130        LockResponse.newBuilder().setProcId(procId).build());
131    when(master.lockHeartbeat(any(), any())).thenReturn(
132        UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, LOCKED_RESPONSE);
133
134    lock.requestLock();
135    // we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime
136    lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS);
137    assertTrue(lock.isLocked());
138    lock.unlock();
139    assertTrue(!lock.getWorker().isAlive());
140    assertFalse(lock.isLocked());
141
142    // check LockRequest in requestLock()
143    verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture());
144    LockRequest request = lockReqArgCaptor.getValue();
145    assertEquals("namespace", request.getNamespace());
146    assertEquals("description", request.getDescription());
147    assertEquals(LockType.EXCLUSIVE, request.getLockType());
148    assertEquals(0, request.getRegionInfoCount());
149
150    // check LockHeartbeatRequest in lockHeartbeat()
151    verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture());
152    for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) {
153      assertEquals(procId, req.getProcId());
154    }
155  }
156
157  /**
158   * Test that abort is called when lock times out.
159   */
160  @Test
161  public void testEntityLockTimeout() throws Exception {
162    final long workerSleepTime = 200;  // in ms
163    Abortable abortable = Mockito.mock(Abortable.class);
164    EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
165    lock.setTestingSleepTime(workerSleepTime);
166
167    when(master.requestLock(any(), any()))
168        .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
169    // Acquires the lock, but then it times out (since we don't call unlock() on it).
170    when(master.lockHeartbeat(any(), any()))
171      .thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE);
172
173    lock.requestLock();
174    lock.await();
175    assertTrue(lock.isLocked());
176    // Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure.
177    assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime));
178
179    // Works' run() returns, there is a small gap that the thread is still alive(os
180    // has not declare it is dead yet), so remove the following assertion.
181    // assertFalse(lock.getWorker().isAlive());
182    verify(abortable, times(1)).abort(any(), eq(null));
183  }
184
185  /**
186   * Test that abort is called when lockHeartbeat fails with IOException.
187   */
188  @Test
189  public void testHeartbeatException() throws Exception {
190    final long workerSleepTime = 100;  // in ms
191    Abortable abortable = Mockito.mock(Abortable.class);
192    EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
193    lock.setTestingSleepTime(workerSleepTime);
194
195    when(master.requestLock(any(), any()))
196        .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
197    when(master.lockHeartbeat(any(), any()))
198        .thenReturn(LOCKED_RESPONSE)
199        .thenThrow(new ServiceException("Failed heartbeat!"));
200
201    lock.requestLock();
202    lock.await();
203    assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime));
204    while (lock.getWorker().isAlive()) {
205      TimeUnit.MILLISECONDS.sleep(100);
206    }
207    verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class));
208    assertFalse(lock.getWorker().isAlive());
209  }
210}