001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.locking;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.locking.LockServiceClient;
038import org.apache.hadoop.hbase.master.MasterRpcServices;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
040import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
041import org.apache.hadoop.hbase.procedure2.LockType;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.hamcrest.core.IsInstanceOf;
049import org.hamcrest.core.StringStartsWith;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.ExpectedException;
059import org.junit.rules.TestName;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
070
071@Category({MasterTests.class, MediumTests.class})
072public class TestLockProcedure {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestLockProcedure.class);
077
078  @Rule
079  public final ExpectedException exception = ExpectedException.none();
080  @Rule
081  public TestName testName = new TestName();
082  // crank this up if this test turns out to be flaky.
083  private static final int HEARTBEAT_TIMEOUT = 2000;
084  private static final int LOCAL_LOCKS_TIMEOUT = 4000;
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestLockProcedure.class);
087  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
088  private static MasterRpcServices masterRpcService;
089  private static ProcedureExecutor<MasterProcedureEnv> procExec;
090
091  private static String namespace = "namespace";
092  private static TableName tableName1 = TableName.valueOf(namespace, "table1");
093  private static List<RegionInfo> tableRegions1;
094  private static TableName tableName2 = TableName.valueOf(namespace, "table2");
095  private static List<RegionInfo> tableRegions2;
096
097  private String testMethodName;
098
099  private static void setupConf(Configuration conf) {
100    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
101    conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
102    conf.setBoolean("hbase.procedure.check.owner.set", false);  // since rpc user will be null
103    conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
104    conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
105  }
106
107  @BeforeClass
108  public static void setupCluster() throws Exception {
109    setupConf(UTIL.getConfiguration());
110    UTIL.startMiniCluster(1);
111    UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
112    UTIL.createTable(tableName1,
113        new byte[][]{ Bytes.toBytes("fam")}, new byte[][] {Bytes.toBytes("1")});
114    UTIL.createTable(tableName2,
115        new byte[][]{Bytes.toBytes("fam")}, new byte[][] {Bytes.toBytes("1")});
116    masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices();
117    procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
118    tableRegions1 = UTIL.getAdmin().getRegions(tableName1);
119    tableRegions2 = UTIL.getAdmin().getRegions(tableName2);
120    assert tableRegions1.size() > 0;
121    assert tableRegions2.size() > 0;
122  }
123
124  @AfterClass
125  public static void cleanupTest() throws Exception {
126    try {
127      UTIL.shutdownMiniCluster();
128    } catch (Exception e) {
129      LOG.warn("failure shutting down cluster", e);
130    }
131  }
132
133  @Before
134  public void setup() throws Exception {
135    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
136    testMethodName = testName.getMethodName();
137  }
138
139  @After
140  public void tearDown() throws Exception {
141    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
142    // Kill all running procedures.
143    for (Procedure<?> proc : procExec.getProcedures()) {
144      procExec.abort(proc.getProcId());
145      ProcedureTestingUtility.waitProcedure(procExec, proc);
146    }
147    assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size());
148  }
149
150  private LockRequest getNamespaceLock(String namespace, String description) {
151    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
152        namespace, null, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
153  }
154
155  private LockRequest getTableExclusiveLock(TableName tableName, String description) {
156    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
157        null, tableName, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
158  }
159
160  private LockRequest getRegionLock(List<RegionInfo> regionInfos, String description) {
161    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
162        null, null, regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
163  }
164
165  private void validateLockRequestException(LockRequest lockRequest, String message)
166      throws Exception {
167    exception.expect(ServiceException.class);
168    exception.expectCause(IsInstanceOf.instanceOf(DoNotRetryIOException.class));
169    exception.expectMessage(
170        StringStartsWith.startsWith("org.apache.hadoop.hbase.DoNotRetryIOException: "
171            + "java.lang.IllegalArgumentException: " + message));
172    masterRpcService.requestLock(null, lockRequest);
173  }
174
175  @Test
176  public void testLockRequestValidationEmptyDescription() throws Exception {
177    validateLockRequestException(getNamespaceLock("", ""), "Empty description");
178  }
179
180  @Test
181  public void testLockRequestValidationEmptyNamespaceName() throws Exception {
182    validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace");
183  }
184
185  @Test
186  public void testLockRequestValidationRegionsFromDifferentTable() throws Exception {
187    List<RegionInfo> regions = new ArrayList<>();
188    regions.addAll(tableRegions1);
189    regions.addAll(tableRegions2);
190    validateLockRequestException(getRegionLock(regions, "desc"),
191        "All regions should be from same table");
192  }
193
194  /**
195   * Returns immediately if the lock is acquired.
196   * @throws TimeoutException if lock couldn't be acquired.
197   */
198  private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception {
199    long deadline = System.currentTimeMillis() + timeoutInMs;
200    while (System.currentTimeMillis() < deadline) {
201      LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
202          LockHeartbeatRequest.newBuilder().setProcId(procId).build());
203      if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
204        assertEquals(HEARTBEAT_TIMEOUT, response.getTimeoutMs());
205        LOG.debug(String.format("Proc id %s acquired lock.", procId));
206        return true;
207      }
208      Thread.sleep(100);
209    }
210    return false;
211  }
212
213  private long queueLock(LockRequest lockRequest) throws ServiceException {
214    LockResponse response = masterRpcService.requestLock(null, lockRequest);
215    return response.getProcId();
216  }
217
218  private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException {
219    LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
220        LockHeartbeatRequest.newBuilder().setProcId(procId).build());
221    if (isLocked) {
222      assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus());
223    } else {
224      assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus());
225    }
226    LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus()));
227  }
228
229  private void releaseLock(long procId) throws ServiceException {
230    masterRpcService.lockHeartbeat(null,
231        LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
232  }
233
234  @Test
235  public void testUpdateHeartbeatAndUnlockForTable() throws Exception {
236    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
237    final long procId = queueLock(lock);
238    assertTrue(awaitForLocked(procId, 2000));
239    Thread.sleep(HEARTBEAT_TIMEOUT /2);
240    sendHeartbeatAndCheckLocked(procId, true);
241    Thread.sleep(HEARTBEAT_TIMEOUT /2);
242    sendHeartbeatAndCheckLocked(procId, true);
243    Thread.sleep(HEARTBEAT_TIMEOUT /2);
244    sendHeartbeatAndCheckLocked(procId, true);
245    releaseLock(procId);
246    sendHeartbeatAndCheckLocked(procId, false);
247    ProcedureTestingUtility.waitProcedure(procExec, procId);
248    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
249  }
250
251  @Test
252  public void testAbort() throws Exception {
253    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
254    final long procId = queueLock(lock);
255    assertTrue(awaitForLocked(procId, 2000));
256    assertTrue(procExec.abort(procId));
257    sendHeartbeatAndCheckLocked(procId, false);
258    ProcedureTestingUtility.waitProcedure(procExec, procId);
259    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
260  }
261
262  @Test
263  public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception {
264    LockRequest lock = getNamespaceLock(namespace, testMethodName);
265    final long procId = queueLock(lock);
266    assertTrue(awaitForLocked(procId, 2000));
267    Thread.sleep(HEARTBEAT_TIMEOUT /2);
268    sendHeartbeatAndCheckLocked(procId, true);
269    Thread.sleep(HEARTBEAT_TIMEOUT /2);
270    sendHeartbeatAndCheckLocked(procId, true);
271    Thread.sleep(HEARTBEAT_TIMEOUT /2);
272    sendHeartbeatAndCheckLocked(procId, true);
273    releaseLock(procId);
274    sendHeartbeatAndCheckLocked(procId, false);
275    ProcedureTestingUtility.waitProcedure(procExec, procId);
276    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
277  }
278
279  @Test
280  public void testTimeout() throws Exception {
281    LockRequest lock = getNamespaceLock(namespace, testMethodName);
282    final long procId = queueLock(lock);
283    assertTrue(awaitForLocked(procId, 2000));
284    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
285    sendHeartbeatAndCheckLocked(procId, true);
286    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
287    sendHeartbeatAndCheckLocked(procId, true);
288    Thread.sleep(4 * HEARTBEAT_TIMEOUT);
289    sendHeartbeatAndCheckLocked(procId, false);
290    ProcedureTestingUtility.waitProcedure(procExec, procId);
291    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
292  }
293
294  @Test
295  public void testMultipleLocks() throws Exception {
296    LockRequest nsLock = getNamespaceLock(namespace, testMethodName);
297    LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName);
298    LockRequest tableLock2 =  getTableExclusiveLock(tableName2, testMethodName);
299    LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName);
300    LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName);
301    // Acquire namespace lock, then queue other locks.
302    long nsProcId = queueLock(nsLock);
303    assertTrue(awaitForLocked(nsProcId, 2000));
304    long start = System.currentTimeMillis();
305    sendHeartbeatAndCheckLocked(nsProcId, true);
306    long table1ProcId = queueLock(tableLock1);
307    long table2ProcId = queueLock(tableLock2);
308    long regions1ProcId = queueLock(regionsLock1);
309    long regions2ProcId = queueLock(regionsLock2);
310
311    // Assert tables & region locks are waiting because of namespace lock.
312    long now = System.currentTimeMillis();
313    // leave extra 10 msec in case more than half the HEARTBEAT_TIMEOUT has passed
314    Thread.sleep(Math.min(HEARTBEAT_TIMEOUT / 2, Math.max(HEARTBEAT_TIMEOUT-(now-start)-10, 0)));
315    sendHeartbeatAndCheckLocked(nsProcId, true);
316    sendHeartbeatAndCheckLocked(table1ProcId, false);
317    sendHeartbeatAndCheckLocked(table2ProcId, false);
318    sendHeartbeatAndCheckLocked(regions1ProcId, false);
319    sendHeartbeatAndCheckLocked(regions2ProcId, false);
320
321    // Release namespace lock and assert tables locks are acquired but not region lock
322    releaseLock(nsProcId);
323    assertTrue(awaitForLocked(table1ProcId, 2000));
324    assertTrue(awaitForLocked(table2ProcId, 2000));
325    sendHeartbeatAndCheckLocked(regions1ProcId, false);
326    sendHeartbeatAndCheckLocked(regions2ProcId, false);
327
328    // Release table1 lock and assert region lock is acquired.
329    releaseLock(table1ProcId);
330    sendHeartbeatAndCheckLocked(table1ProcId, false);
331    assertTrue(awaitForLocked(regions1ProcId, 2000));
332    sendHeartbeatAndCheckLocked(table2ProcId, true);
333    sendHeartbeatAndCheckLocked(regions2ProcId, false);
334
335    // Release table2 lock and assert region lock is acquired.
336    releaseLock(table2ProcId);
337    sendHeartbeatAndCheckLocked(table2ProcId, false);
338    assertTrue(awaitForLocked(regions2ProcId, 2000));
339    sendHeartbeatAndCheckLocked(regions1ProcId, true);
340    sendHeartbeatAndCheckLocked(regions2ProcId, true);
341
342    // Release region locks.
343    releaseLock(regions1ProcId);
344    releaseLock(regions2ProcId);
345    sendHeartbeatAndCheckLocked(regions1ProcId, false);
346    sendHeartbeatAndCheckLocked(regions2ProcId, false);
347    ProcedureTestingUtility.waitAllProcedures(procExec);
348    ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId);
349    ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId);
350    ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId);
351    ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId);
352    ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId);
353  }
354
355  // Test latch is decreased in count when lock is acquired.
356  @Test
357  public void testLatch() throws Exception {
358    CountDownLatch latch = new CountDownLatch(1);
359    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
360    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
361        TableName.valueOf("table"),
362        org.apache.hadoop.hbase.procedure2.LockType.EXCLUSIVE, "desc", latch);
363    procExec.submitProcedure(lockProc);
364    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
365    releaseLock(lockProc.getProcId());
366    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
367    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
368  }
369
370  // LockProcedures with latch are considered local locks.
371  @Test
372  public void testLocalLockTimeout() throws Exception {
373    CountDownLatch latch = new CountDownLatch(1);
374    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
375    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
376        TableName.valueOf("table"), LockType.EXCLUSIVE, "desc", latch);
377    procExec.submitProcedure(lockProc);
378    assertTrue(awaitForLocked(lockProc.getProcId(), 2000));
379    Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2);
380    assertTrue(lockProc.isLocked());
381    Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT);
382    assertFalse(lockProc.isLocked());
383    releaseLock(lockProc.getProcId());
384    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
385    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
386  }
387
388  private void testRemoteLockRecovery(LockRequest lock) throws Exception {
389    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
390    final long procId = queueLock(lock);
391    assertTrue(awaitForLocked(procId, 2000));
392
393    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
394    ProcedureTestingUtility.waitProcedure(procExec, procId);
395    assertEquals(false, procExec.isRunning());
396    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
397    ProcedureTestingUtility.restart(procExec);
398    while (!procExec.isStarted(procId)) {
399      Thread.sleep(250);
400    }
401    assertEquals(true, procExec.isRunning());
402
403    // After recovery, remote locks should reacquire locks and function normally.
404    assertTrue(awaitForLocked(procId, 2000));
405    Thread.sleep(HEARTBEAT_TIMEOUT/2);
406    sendHeartbeatAndCheckLocked(procId, true);
407    Thread.sleep(HEARTBEAT_TIMEOUT/2);
408    sendHeartbeatAndCheckLocked(procId, true);
409    Thread.sleep(2 * HEARTBEAT_TIMEOUT + HEARTBEAT_TIMEOUT/2);
410    sendHeartbeatAndCheckLocked(procId, false);
411    ProcedureTestingUtility.waitProcedure(procExec, procId);
412    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
413  }
414
415  @Test
416  public void testRemoteTableLockRecovery() throws Exception {
417    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
418    testRemoteLockRecovery(lock);
419  }
420
421  @Test
422  public void testRemoteNamespaceLockRecovery() throws Exception {
423    LockRequest lock = getNamespaceLock(namespace, testMethodName);
424    testRemoteLockRecovery(lock);
425  }
426
427  @Test
428  public void testRemoteRegionLockRecovery() throws Exception {
429    LockRequest lock = getRegionLock(tableRegions1, testMethodName);
430    testRemoteLockRecovery(lock);
431  }
432
433  @Test
434  public void testLocalMasterLockRecovery() throws Exception {
435    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
436    CountDownLatch latch = new CountDownLatch(1);
437    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
438        TableName.valueOf("table"), LockType.EXCLUSIVE, "desc", latch);
439    procExec.submitProcedure(lockProc);
440    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
441
442    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
443    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
444    assertEquals(false, procExec.isRunning());
445    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
446    // remove zk lock node otherwise recovered lock will keep waiting on it.
447    ProcedureTestingUtility.restart(procExec);
448    while (!procExec.isStarted(lockProc.getProcId())) {
449      Thread.sleep(250);
450    }
451    assertEquals(true, procExec.isRunning());
452    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
453    Procedure<?> result = procExec.getResultOrProcedure(lockProc.getProcId());
454    assertTrue(result != null && !result.isFailed());
455    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
456  }
457}