001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.locking;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.core.StringStartsWith.startsWith;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertThrows;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.locking.LockServiceClient;
041import org.apache.hadoop.hbase.master.MasterRpcServices;
042import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
043import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
044import org.apache.hadoop.hbase.procedure2.LockType;
045import org.apache.hadoop.hbase.procedure2.Procedure;
046import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
047import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.junit.jupiter.api.AfterAll;
053import org.junit.jupiter.api.AfterEach;
054import org.junit.jupiter.api.BeforeAll;
055import org.junit.jupiter.api.BeforeEach;
056import org.junit.jupiter.api.Tag;
057import org.junit.jupiter.api.Test;
058import org.junit.jupiter.api.TestInfo;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
063
064import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
069
070@Tag(MasterTests.TAG)
071@Tag(LargeTests.TAG)
072public class TestLockProcedure {
073
074  // crank this up if this test turns out to be flaky.
075  private static final int HEARTBEAT_TIMEOUT = 2000;
076  private static final int LOCAL_LOCKS_TIMEOUT = 4000;
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestLockProcedure.class);
079  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
080  private static MasterRpcServices masterRpcService;
081  private static ProcedureExecutor<MasterProcedureEnv> procExec;
082
083  private static String namespace = "namespace";
084  private static TableName tableName1 = TableName.valueOf(namespace, "table1");
085  private static List<RegionInfo> tableRegions1;
086  private static TableName tableName2 = TableName.valueOf(namespace, "table2");
087  private static List<RegionInfo> tableRegions2;
088
089  private String testMethodName;
090
091  private static void setupConf(Configuration conf) {
092    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
093    conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null
094    conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
095    conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
096  }
097
098  @BeforeAll
099  public static void setupCluster() throws Exception {
100    setupConf(UTIL.getConfiguration());
101    UTIL.startMiniCluster(1);
102    UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
103    UTIL.createTable(tableName1, new byte[][] { Bytes.toBytes("fam") },
104      new byte[][] { Bytes.toBytes("1") });
105    UTIL.createTable(tableName2, new byte[][] { Bytes.toBytes("fam") },
106      new byte[][] { Bytes.toBytes("1") });
107    masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices();
108    procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
109    tableRegions1 = UTIL.getAdmin().getRegions(tableName1);
110    tableRegions2 = UTIL.getAdmin().getRegions(tableName2);
111    assert tableRegions1.size() > 0;
112    assert tableRegions2.size() > 0;
113  }
114
115  @AfterAll
116  public static void cleanupTest() throws Exception {
117    try {
118      UTIL.shutdownMiniCluster();
119    } catch (Exception e) {
120      LOG.warn("failure shutting down cluster", e);
121    }
122  }
123
124  @BeforeEach
125  public void setup(TestInfo testInfo) throws Exception {
126    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
127    testMethodName = testInfo.getTestMethod().get().getName();
128  }
129
130  @AfterEach
131  public void tearDown() throws Exception {
132    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
133    // Kill all running procedures.
134    for (Procedure<?> proc : procExec.getProcedures()) {
135      procExec.abort(proc.getProcId());
136      ProcedureTestingUtility.waitProcedure(procExec, proc);
137    }
138    assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size());
139  }
140
141  private LockRequest getNamespaceLock(String namespace, String description) {
142    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, namespace, null,
143      null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
144  }
145
146  private LockRequest getTableExclusiveLock(TableName tableName, String description) {
147    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, null, tableName,
148      null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
149  }
150
151  private LockRequest getRegionLock(List<RegionInfo> regionInfos, String description) {
152    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE, null, null,
153      regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
154  }
155
156  private void validateLockRequestException(LockRequest lockRequest, String message)
157    throws Exception {
158    ServiceException serviceException =
159      assertThrows(ServiceException.class, () -> masterRpcService.requestLock(null, lockRequest));
160    assertThat(serviceException.getCause(), instanceOf(DoNotRetryIOException.class));
161    assertThat(serviceException.getMessage(),
162      startsWith(
163        "org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalArgumentException: "
164          + message));
165  }
166
167  @Test
168  public void testLockRequestValidationEmptyDescription() throws Exception {
169    validateLockRequestException(getNamespaceLock("", ""), "Empty description");
170  }
171
172  @Test
173  public void testLockRequestValidationEmptyNamespaceName() throws Exception {
174    validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace");
175  }
176
177  @Test
178  public void testLockRequestValidationRegionsFromDifferentTable() throws Exception {
179    List<RegionInfo> regions = new ArrayList<>();
180    regions.addAll(tableRegions1);
181    regions.addAll(tableRegions2);
182    validateLockRequestException(getRegionLock(regions, "desc"),
183      "All regions should be from same table");
184  }
185
186  /**
187   * Returns immediately if the lock is acquired.
188   * @throws TimeoutException if lock couldn't be acquired.
189   */
190  private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception {
191    long deadline = EnvironmentEdgeManager.currentTime() + timeoutInMs;
192    while (EnvironmentEdgeManager.currentTime() < deadline) {
193      LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
194        LockHeartbeatRequest.newBuilder().setProcId(procId).build());
195      if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
196        assertEquals(HEARTBEAT_TIMEOUT, response.getTimeoutMs());
197        LOG.debug(String.format("Proc id %s acquired lock.", procId));
198        return true;
199      }
200      Thread.sleep(100);
201    }
202    return false;
203  }
204
205  private long queueLock(LockRequest lockRequest) throws ServiceException {
206    LockResponse response = masterRpcService.requestLock(null, lockRequest);
207    return response.getProcId();
208  }
209
210  private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException {
211    LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
212      LockHeartbeatRequest.newBuilder().setProcId(procId).build());
213    if (isLocked) {
214      assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus());
215    } else {
216      assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus());
217    }
218    LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus()));
219  }
220
221  private void releaseLock(long procId) throws ServiceException {
222    masterRpcService.lockHeartbeat(null,
223      LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
224  }
225
226  @Test
227  public void testUpdateHeartbeatAndUnlockForTable() throws Exception {
228    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
229    final long procId = queueLock(lock);
230    assertTrue(awaitForLocked(procId, 2000));
231    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
232    sendHeartbeatAndCheckLocked(procId, true);
233    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
234    sendHeartbeatAndCheckLocked(procId, true);
235    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
236    sendHeartbeatAndCheckLocked(procId, true);
237    releaseLock(procId);
238    sendHeartbeatAndCheckLocked(procId, false);
239    ProcedureTestingUtility.waitProcedure(procExec, procId);
240    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
241  }
242
243  @Test
244  public void testAbort() throws Exception {
245    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
246    final long procId = queueLock(lock);
247    assertTrue(awaitForLocked(procId, 2000));
248    assertTrue(procExec.abort(procId));
249    sendHeartbeatAndCheckLocked(procId, false);
250    ProcedureTestingUtility.waitProcedure(procExec, procId);
251    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
252  }
253
254  @Test
255  public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception {
256    LockRequest lock = getNamespaceLock(namespace, testMethodName);
257    final long procId = queueLock(lock);
258    assertTrue(awaitForLocked(procId, 2000));
259    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
260    sendHeartbeatAndCheckLocked(procId, true);
261    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
262    sendHeartbeatAndCheckLocked(procId, true);
263    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
264    sendHeartbeatAndCheckLocked(procId, true);
265    releaseLock(procId);
266    sendHeartbeatAndCheckLocked(procId, false);
267    ProcedureTestingUtility.waitProcedure(procExec, procId);
268    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
269  }
270
271  @Test
272  public void testTimeout() throws Exception {
273    LockRequest lock = getNamespaceLock(namespace, testMethodName);
274    final long procId = queueLock(lock);
275    assertTrue(awaitForLocked(procId, 2000));
276    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
277    sendHeartbeatAndCheckLocked(procId, true);
278    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
279    sendHeartbeatAndCheckLocked(procId, true);
280    Thread.sleep(4 * HEARTBEAT_TIMEOUT);
281    sendHeartbeatAndCheckLocked(procId, false);
282    ProcedureTestingUtility.waitProcedure(procExec, procId);
283    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
284  }
285
286  @Test
287  public void testMultipleLocks() throws Exception {
288    LockRequest nsLock = getNamespaceLock(namespace, testMethodName);
289    LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName);
290    LockRequest tableLock2 = getTableExclusiveLock(tableName2, testMethodName);
291    LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName);
292    LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName);
293    // Acquire namespace lock, then queue other locks.
294    long nsProcId = queueLock(nsLock);
295    assertTrue(awaitForLocked(nsProcId, 2000));
296    long start = EnvironmentEdgeManager.currentTime();
297    sendHeartbeatAndCheckLocked(nsProcId, true);
298    long table1ProcId = queueLock(tableLock1);
299    long table2ProcId = queueLock(tableLock2);
300    long regions1ProcId = queueLock(regionsLock1);
301    long regions2ProcId = queueLock(regionsLock2);
302
303    // Assert tables & region locks are waiting because of namespace lock.
304    long now = EnvironmentEdgeManager.currentTime();
305    // leave extra 10 msec in case more than half the HEARTBEAT_TIMEOUT has passed
306    Thread
307      .sleep(Math.min(HEARTBEAT_TIMEOUT / 2, Math.max(HEARTBEAT_TIMEOUT - (now - start) - 10, 0)));
308    sendHeartbeatAndCheckLocked(nsProcId, true);
309    sendHeartbeatAndCheckLocked(table1ProcId, false);
310    sendHeartbeatAndCheckLocked(table2ProcId, false);
311    sendHeartbeatAndCheckLocked(regions1ProcId, false);
312    sendHeartbeatAndCheckLocked(regions2ProcId, false);
313
314    // Release namespace lock and assert tables locks are acquired but not region lock
315    releaseLock(nsProcId);
316    assertTrue(awaitForLocked(table1ProcId, 2000));
317    assertTrue(awaitForLocked(table2ProcId, 2000));
318    sendHeartbeatAndCheckLocked(regions1ProcId, false);
319    sendHeartbeatAndCheckLocked(regions2ProcId, false);
320
321    // Release table1 lock and assert region lock is acquired.
322    releaseLock(table1ProcId);
323    sendHeartbeatAndCheckLocked(table1ProcId, false);
324    assertTrue(awaitForLocked(regions1ProcId, 2000));
325    sendHeartbeatAndCheckLocked(table2ProcId, true);
326    sendHeartbeatAndCheckLocked(regions2ProcId, false);
327
328    // Release table2 lock and assert region lock is acquired.
329    releaseLock(table2ProcId);
330    sendHeartbeatAndCheckLocked(table2ProcId, false);
331    assertTrue(awaitForLocked(regions2ProcId, 2000));
332    sendHeartbeatAndCheckLocked(regions1ProcId, true);
333    sendHeartbeatAndCheckLocked(regions2ProcId, true);
334
335    // Release region locks.
336    releaseLock(regions1ProcId);
337    releaseLock(regions2ProcId);
338    sendHeartbeatAndCheckLocked(regions1ProcId, false);
339    sendHeartbeatAndCheckLocked(regions2ProcId, false);
340    ProcedureTestingUtility.waitAllProcedures(procExec);
341    ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId);
342    ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId);
343    ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId);
344    ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId);
345    ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId);
346  }
347
348  // Test latch is decreased in count when lock is acquired.
349  @Test
350  public void testLatch() throws Exception {
351    CountDownLatch latch = new CountDownLatch(1);
352    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
353    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"),
354      org.apache.hadoop.hbase.procedure2.LockType.EXCLUSIVE, "desc", latch);
355    procExec.submitProcedure(lockProc);
356    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
357    releaseLock(lockProc.getProcId());
358    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
359    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
360  }
361
362  // LockProcedures with latch are considered local locks.
363  @Test
364  public void testLocalLockTimeout() throws Exception {
365    CountDownLatch latch = new CountDownLatch(1);
366    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
367    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"),
368      LockType.EXCLUSIVE, "desc", latch);
369    procExec.submitProcedure(lockProc);
370    assertTrue(awaitForLocked(lockProc.getProcId(), 2000));
371    Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2);
372    assertTrue(lockProc.isLocked());
373    Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT);
374    assertFalse(lockProc.isLocked());
375    releaseLock(lockProc.getProcId());
376    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
377    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
378  }
379
380  private void testRemoteLockRecovery(LockRequest lock) throws Exception {
381    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
382    final long procId = queueLock(lock);
383    assertTrue(awaitForLocked(procId, 2000));
384
385    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
386    ProcedureTestingUtility.waitProcedure(procExec, procId);
387    assertEquals(false, procExec.isRunning());
388    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
389    ProcedureTestingUtility.restart(procExec);
390    while (!procExec.isStarted(procId)) {
391      Thread.sleep(250);
392    }
393    assertEquals(true, procExec.isRunning());
394
395    // After recovery, remote locks should reacquire locks and function normally.
396    assertTrue(awaitForLocked(procId, 2000));
397    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
398    sendHeartbeatAndCheckLocked(procId, true);
399    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
400    sendHeartbeatAndCheckLocked(procId, true);
401    Thread.sleep(2 * HEARTBEAT_TIMEOUT + HEARTBEAT_TIMEOUT / 2);
402    sendHeartbeatAndCheckLocked(procId, false);
403    ProcedureTestingUtility.waitProcedure(procExec, procId);
404    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
405  }
406
407  @Test
408  public void testRemoteTableLockRecovery() throws Exception {
409    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
410    testRemoteLockRecovery(lock);
411  }
412
413  @Test
414  public void testRemoteNamespaceLockRecovery() throws Exception {
415    LockRequest lock = getNamespaceLock(namespace, testMethodName);
416    testRemoteLockRecovery(lock);
417  }
418
419  @Test
420  public void testRemoteRegionLockRecovery() throws Exception {
421    LockRequest lock = getRegionLock(tableRegions1, testMethodName);
422    testRemoteLockRecovery(lock);
423  }
424
425  @Test
426  public void testLocalMasterLockRecovery() throws Exception {
427    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
428    CountDownLatch latch = new CountDownLatch(1);
429    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(), TableName.valueOf("table"),
430      LockType.EXCLUSIVE, "desc", latch);
431    procExec.submitProcedure(lockProc);
432    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
433
434    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
435    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
436    assertEquals(false, procExec.isRunning());
437    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
438    // remove zk lock node otherwise recovered lock will keep waiting on it.
439    ProcedureTestingUtility.restart(procExec);
440    while (!procExec.isStarted(lockProc.getProcId())) {
441      Thread.sleep(250);
442    }
443    assertEquals(true, procExec.isRunning());
444    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
445    Procedure<?> result = procExec.getResultOrProcedure(lockProc.getProcId());
446    assertTrue(result != null && !result.isFailed());
447    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
448  }
449}