001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.locking;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.locking.LockServiceClient;
038import org.apache.hadoop.hbase.master.MasterRpcServices;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
040import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
041import org.apache.hadoop.hbase.procedure2.LockType;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.hamcrest.core.IsInstanceOf;
050import org.hamcrest.core.StringStartsWith;
051import org.junit.After;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.ExpectedException;
060import org.junit.rules.TestName;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
065
066import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
071
072@Category({ MasterTests.class, LargeTests.class })
073public class TestLockProcedure {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestLockProcedure.class);
078
079  @Rule
080  public final ExpectedException exception = ExpectedException.none();
081  @Rule
082  public TestName testName = new TestName();
083  // crank this up if this test turns out to be flaky.
084  private static final int HEARTBEAT_TIMEOUT = 2000;
085  private static final int LOCAL_LOCKS_TIMEOUT = 4000;
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestLockProcedure.class);
088  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
089  private static MasterRpcServices masterRpcService;
090  private static ProcedureExecutor<MasterProcedureEnv> procExec;
091
092  private static String namespace = "namespace";
093  private static TableName tableName1 = TableName.valueOf(namespace, "table1");
094  private static List<RegionInfo> tableRegions1;
095  private static TableName tableName2 = TableName.valueOf(namespace, "table2");
096  private static List<RegionInfo> tableRegions2;
097
098  private String testMethodName;
099
100  private static void setupConf(Configuration conf) {
101    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
102    conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null
103    conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
104    conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
105  }
106
107  @BeforeClass
108  public static void setupCluster() throws Exception {
109    setupConf(UTIL.getConfiguration());
110    UTIL.startMiniCluster(1);
111    UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
112    UTIL.createTable(tableName1, new byte[][] { Bytes.toBytes("fam") },
113      new byte[][] { Bytes.toBytes("1") });
114    UTIL.createTable(tableName2, new byte[][] { Bytes.toBytes("fam") },
115      new byte[][] { Bytes.toBytes("1") });
116    masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices();
117    procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
118    tableRegions1 = UTIL.getAdmin().getRegions(tableName1);
119    tableRegions2 = UTIL.getAdmin().getRegions(tableName2);
120    assert tableRegions1.size() > 0;
121    assert tableRegions2.size() > 0;
122  }
123
124  @AfterClass
125  public static void cleanupTest() throws Exception {
126    try {
127      UTIL.shutdownMiniCluster();
128    } catch (Exception e) {
129      LOG.warn("failure shutting down cluster", e);
130    }
131  }
132
133  @Before
134  public void setup() throws Exception {
135    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
136    testMethodName = testName.getMethodName();
137  }
138
139  @After
140  public void tearDown() throws Exception {
141    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
142    // Kill all running procedures.
143    for (Procedure<?> proc : procExec.getProcedures()) {
144      procExec.abort(proc.getProcId());
145      ProcedureTestingUtility.waitProcedure(procExec, proc);
146    }
147    assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size());
148  }
149
150  private LockRequest getNamespaceLock(String namespace, String description) {
151    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, namespace, null,
152      null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
153  }
154
155  private LockRequest getTableExclusiveLock(TableName tableName, String description) {
156    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, null, tableName,
157      null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
158  }
159
160  private LockRequest getRegionLock(List<RegionInfo> regionInfos, String description) {
161    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, null, null,
162      regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
163  }
164
165  private void validateLockRequestException(LockRequest lockRequest, String message)
166    throws Exception {
167    exception.expect(ServiceException.class);
168    exception.expectCause(IsInstanceOf.instanceOf(DoNotRetryIOException.class));
169    exception
170      .expectMessage(StringStartsWith.startsWith("org.apache.hadoop.hbase.DoNotRetryIOException: "
171        + "java.lang.IllegalArgumentException: " + message));
172    masterRpcService.requestLock(null, lockRequest);
173  }
174
175  @Test
176  public void testLockRequestValidationEmptyDescription() throws Exception {
177    validateLockRequestException(getNamespaceLock("", ""), "Empty description");
178  }
179
180  @Test
181  public void testLockRequestValidationEmptyNamespaceName() throws Exception {
182    validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace");
183  }
184
185  @Test
186  public void testLockRequestValidationRegionsFromDifferentTable() throws Exception {
187    List<RegionInfo> regions = new ArrayList<>();
188    regions.addAll(tableRegions1);
189    regions.addAll(tableRegions2);
190    validateLockRequestException(getRegionLock(regions, "desc"),
191      "All regions should be from same table");
192  }
193
194  /**
195   * Returns immediately if the lock is acquired.
196   * @throws TimeoutException if lock couldn't be acquired.
197   */
198  private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception {
199    long deadline = EnvironmentEdgeManager.currentTime() + timeoutInMs;
200    while (EnvironmentEdgeManager.currentTime() < deadline) {
201      LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
202        LockHeartbeatRequest.newBuilder().setProcId(procId).build());
203      if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
204        assertEquals(HEARTBEAT_TIMEOUT, response.getTimeoutMs());
205        LOG.debug(String.format("Proc id %s acquired lock.", procId));
206        return true;
207      }
208      Thread.sleep(100);
209    }
210    return false;
211  }
212
213  private long queueLock(LockRequest lockRequest) throws ServiceException {
214    LockResponse response = masterRpcService.requestLock(null, lockRequest);
215    return response.getProcId();
216  }
217
218  private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException {
219    LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
220      LockHeartbeatRequest.newBuilder().setProcId(procId).build());
221    if (isLocked) {
222      assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus());
223    } else {
224      assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus());
225    }
226    LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus()));
227  }
228
229  private void releaseLock(long procId) throws ServiceException {
230    masterRpcService.lockHeartbeat(null,
231      LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
232  }
233
234  @Test
235  public void testUpdateHeartbeatAndUnlockForTable() throws Exception {
236    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
237    final long procId = queueLock(lock);
238    assertTrue(awaitForLocked(procId, 2000));
239    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
240    sendHeartbeatAndCheckLocked(procId, true);
241    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
242    sendHeartbeatAndCheckLocked(procId, true);
243    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
244    sendHeartbeatAndCheckLocked(procId, true);
245    releaseLock(procId);
246    sendHeartbeatAndCheckLocked(procId, false);
247    ProcedureTestingUtility.waitProcedure(procExec, procId);
248    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
249  }
250
251  @Test
252  public void testAbort() throws Exception {
253    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
254    final long procId = queueLock(lock);
255    assertTrue(awaitForLocked(procId, 2000));
256    assertTrue(procExec.abort(procId));
257    sendHeartbeatAndCheckLocked(procId, false);
258    ProcedureTestingUtility.waitProcedure(procExec, procId);
259    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
260  }
261
262  @Test
263  public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception {
264    LockRequest lock = getNamespaceLock(namespace, testMethodName);
265    final long procId = queueLock(lock);
266    assertTrue(awaitForLocked(procId, 2000));
267    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
268    sendHeartbeatAndCheckLocked(procId, true);
269    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
270    sendHeartbeatAndCheckLocked(procId, true);
271    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
272    sendHeartbeatAndCheckLocked(procId, true);
273    releaseLock(procId);
274    sendHeartbeatAndCheckLocked(procId, false);
275    ProcedureTestingUtility.waitProcedure(procExec, procId);
276    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
277  }
278
279  @Test
280  public void testTimeout() throws Exception {
281    LockRequest lock = getNamespaceLock(namespace, testMethodName);
282    final long procId = queueLock(lock);
283    assertTrue(awaitForLocked(procId, 2000));
284    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
285    sendHeartbeatAndCheckLocked(procId, true);
286    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
287    sendHeartbeatAndCheckLocked(procId, true);
288    Thread.sleep(4 * HEARTBEAT_TIMEOUT);
289    sendHeartbeatAndCheckLocked(procId, false);
290    ProcedureTestingUtility.waitProcedure(procExec, procId);
291    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
292  }
293
294  @Test
295  public void testMultipleLocks() throws Exception {
296    LockRequest nsLock = getNamespaceLock(namespace, testMethodName);
297    LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName);
298    LockRequest tableLock2 = getTableExclusiveLock(tableName2, testMethodName);
299    LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName);
300    LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName);
301    // Acquire namespace lock, then queue other locks.
302    long nsProcId = queueLock(nsLock);
303    assertTrue(awaitForLocked(nsProcId, 2000));
304    long start = EnvironmentEdgeManager.currentTime();
305    sendHeartbeatAndCheckLocked(nsProcId, true);
306    long table1ProcId = queueLock(tableLock1);
307    long table2ProcId = queueLock(tableLock2);
308    long regions1ProcId = queueLock(regionsLock1);
309    long regions2ProcId = queueLock(regionsLock2);
310
311    // Assert tables & region locks are waiting because of namespace lock.
312    long now = EnvironmentEdgeManager.currentTime();
313    // leave extra 10 msec in case more than half the HEARTBEAT_TIMEOUT has passed
314    Thread
315      .sleep(Math.min(HEARTBEAT_TIMEOUT / 2, Math.max(HEARTBEAT_TIMEOUT - (now - start) - 10, 0)));
316    sendHeartbeatAndCheckLocked(nsProcId, true);
317    sendHeartbeatAndCheckLocked(table1ProcId, false);
318    sendHeartbeatAndCheckLocked(table2ProcId, false);
319    sendHeartbeatAndCheckLocked(regions1ProcId, false);
320    sendHeartbeatAndCheckLocked(regions2ProcId, false);
321
322    // Release namespace lock and assert tables locks are acquired but not region lock
323    releaseLock(nsProcId);
324    assertTrue(awaitForLocked(table1ProcId, 2000));
325    assertTrue(awaitForLocked(table2ProcId, 2000));
326    sendHeartbeatAndCheckLocked(regions1ProcId, false);
327    sendHeartbeatAndCheckLocked(regions2ProcId, false);
328
329    // Release table1 lock and assert region lock is acquired.
330    releaseLock(table1ProcId);
331    sendHeartbeatAndCheckLocked(table1ProcId, false);
332    assertTrue(awaitForLocked(regions1ProcId, 2000));
333    sendHeartbeatAndCheckLocked(table2ProcId, true);
334    sendHeartbeatAndCheckLocked(regions2ProcId, false);
335
336    // Release table2 lock and assert region lock is acquired.
337    releaseLock(table2ProcId);
338    sendHeartbeatAndCheckLocked(table2ProcId, false);
339    assertTrue(awaitForLocked(regions2ProcId, 2000));
340    sendHeartbeatAndCheckLocked(regions1ProcId, true);
341    sendHeartbeatAndCheckLocked(regions2ProcId, true);
342
343    // Release region locks.
344    releaseLock(regions1ProcId);
345    releaseLock(regions2ProcId);
346    sendHeartbeatAndCheckLocked(regions1ProcId, false);
347    sendHeartbeatAndCheckLocked(regions2ProcId, false);
348    ProcedureTestingUtility.waitAllProcedures(procExec);
349    ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId);
350    ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId);
351    ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId);
352    ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId);
353    ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId);
354  }
355
356  // Test latch is decreased in count when lock is acquired.
357  @Test
358  public void testLatch() throws Exception {
359    CountDownLatch latch = new CountDownLatch(1);
360    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
361    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"),
362      org.apache.hadoop.hbase.procedure2.LockType.EXCLUSIVE, "desc", latch);
363    procExec.submitProcedure(lockProc);
364    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
365    releaseLock(lockProc.getProcId());
366    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
367    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
368  }
369
370  // LockProcedures with latch are considered local locks.
371  @Test
372  public void testLocalLockTimeout() throws Exception {
373    CountDownLatch latch = new CountDownLatch(1);
374    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
375    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"),
376      LockType.EXCLUSIVE, "desc", latch);
377    procExec.submitProcedure(lockProc);
378    assertTrue(awaitForLocked(lockProc.getProcId(), 2000));
379    Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2);
380    assertTrue(lockProc.isLocked());
381    Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT);
382    assertFalse(lockProc.isLocked());
383    releaseLock(lockProc.getProcId());
384    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
385    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
386  }
387
388  private void testRemoteLockRecovery(LockRequest lock) throws Exception {
389    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
390    final long procId = queueLock(lock);
391    assertTrue(awaitForLocked(procId, 2000));
392
393    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
394    ProcedureTestingUtility.waitProcedure(procExec, procId);
395    assertEquals(false, procExec.isRunning());
396    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
397    ProcedureTestingUtility.restart(procExec);
398    while (!procExec.isStarted(procId)) {
399      Thread.sleep(250);
400    }
401    assertEquals(true, procExec.isRunning());
402
403    // After recovery, remote locks should reacquire locks and function normally.
404    assertTrue(awaitForLocked(procId, 2000));
405    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
406    sendHeartbeatAndCheckLocked(procId, true);
407    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
408    sendHeartbeatAndCheckLocked(procId, true);
409    Thread.sleep(2 * HEARTBEAT_TIMEOUT + HEARTBEAT_TIMEOUT / 2);
410    sendHeartbeatAndCheckLocked(procId, false);
411    ProcedureTestingUtility.waitProcedure(procExec, procId);
412    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
413  }
414
415  @Test
416  public void testRemoteTableLockRecovery() throws Exception {
417    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
418    testRemoteLockRecovery(lock);
419  }
420
421  @Test
422  public void testRemoteNamespaceLockRecovery() throws Exception {
423    LockRequest lock = getNamespaceLock(namespace, testMethodName);
424    testRemoteLockRecovery(lock);
425  }
426
427  @Test
428  public void testRemoteRegionLockRecovery() throws Exception {
429    LockRequest lock = getRegionLock(tableRegions1, testMethodName);
430    testRemoteLockRecovery(lock);
431  }
432
433  @Test
434  public void testLocalMasterLockRecovery() throws Exception {
435    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
436    CountDownLatch latch = new CountDownLatch(1);
437    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"),
438      LockType.EXCLUSIVE, "desc", latch);
439    procExec.submitProcedure(lockProc);
440    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
441
442    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
443    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
444    assertEquals(false, procExec.isRunning());
445    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
446    // remove zk lock node otherwise recovered lock will keep waiting on it.
447    ProcedureTestingUtility.restart(procExec);
448    while (!procExec.isStarted(lockProc.getProcId())) {
449      Thread.sleep(250);
450    }
451    assertEquals(true, procExec.isRunning());
452    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
453    Procedure<?> result = procExec.getResultOrProcedure(lockProc.getProcId());
454    assertTrue(result != null && !result.isFailed());
455    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
456  }
457}