001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.locking;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.locking.LockServiceClient;
037import org.apache.hadoop.hbase.master.MasterRpcServices;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
040import org.apache.hadoop.hbase.procedure2.LockType;
041import org.apache.hadoop.hbase.procedure2.Procedure;
042import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.hamcrest.core.IsInstanceOf;
048import org.hamcrest.core.StringStartsWith;
049import org.junit.After;
050import org.junit.AfterClass;
051import org.junit.Before;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Rule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.rules.ExpectedException;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
067
068@Category({MasterTests.class, LargeTests.class})
069public class TestLockProcedure {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestLockProcedure.class);
074
075  @Rule
076  public final ExpectedException exception = ExpectedException.none();
077  @Rule
078  public TestName testName = new TestName();
079  // crank this up if this test turns out to be flaky.
080  private static final int HEARTBEAT_TIMEOUT = 2000;
081  private static final int LOCAL_LOCKS_TIMEOUT = 4000;
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestLockProcedure.class);
084  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
085  private static MasterRpcServices masterRpcService;
086  private static ProcedureExecutor<MasterProcedureEnv> procExec;
087
088  private static String namespace = "namespace";
089  private static TableName tableName1 = TableName.valueOf(namespace, "table1");
090  private static List<RegionInfo> tableRegions1;
091  private static TableName tableName2 = TableName.valueOf(namespace, "table2");
092  private static List<RegionInfo> tableRegions2;
093
094  private String testMethodName;
095
096  private static void setupConf(Configuration conf) {
097    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
098    conf.setBoolean("hbase.procedure.check.owner.set", false);  // since rpc user will be null
099    conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
100    conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
101  }
102
103  @BeforeClass
104  public static void setupCluster() throws Exception {
105    setupConf(UTIL.getConfiguration());
106    UTIL.startMiniCluster(1);
107    UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
108    UTIL.createTable(tableName1,
109        new byte[][]{ Bytes.toBytes("fam")}, new byte[][] {Bytes.toBytes("1")});
110    UTIL.createTable(tableName2,
111        new byte[][]{Bytes.toBytes("fam")}, new byte[][] {Bytes.toBytes("1")});
112    masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices();
113    procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
114    tableRegions1 = UTIL.getAdmin().getRegions(tableName1);
115    tableRegions2 = UTIL.getAdmin().getRegions(tableName2);
116    assert tableRegions1.size() > 0;
117    assert tableRegions2.size() > 0;
118  }
119
120  @AfterClass
121  public static void cleanupTest() throws Exception {
122    try {
123      UTIL.shutdownMiniCluster();
124    } catch (Exception e) {
125      LOG.warn("failure shutting down cluster", e);
126    }
127  }
128
129  @Before
130  public void setup() throws Exception {
131    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
132    testMethodName = testName.getMethodName();
133  }
134
135  @After
136  public void tearDown() throws Exception {
137    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
138    // Kill all running procedures.
139    for (Procedure<?> proc : procExec.getProcedures()) {
140      procExec.abort(proc.getProcId());
141      ProcedureTestingUtility.waitProcedure(procExec, proc);
142    }
143    assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size());
144  }
145
146  private LockRequest getNamespaceLock(String namespace, String description) {
147    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
148        namespace, null, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
149  }
150
151  private LockRequest getTableExclusiveLock(TableName tableName, String description) {
152    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
153        null, tableName, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
154  }
155
156  private LockRequest getRegionLock(List<RegionInfo> regionInfos, String description) {
157    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
158        null, null, regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
159  }
160
161  private void validateLockRequestException(LockRequest lockRequest, String message)
162      throws Exception {
163    exception.expect(ServiceException.class);
164    exception.expectCause(IsInstanceOf.instanceOf(DoNotRetryIOException.class));
165    exception.expectMessage(
166        StringStartsWith.startsWith("org.apache.hadoop.hbase.DoNotRetryIOException: "
167            + "java.lang.IllegalArgumentException: " + message));
168    masterRpcService.requestLock(null, lockRequest);
169  }
170
171  @Test
172  public void testLockRequestValidationEmptyDescription() throws Exception {
173    validateLockRequestException(getNamespaceLock("", ""), "Empty description");
174  }
175
176  @Test
177  public void testLockRequestValidationEmptyNamespaceName() throws Exception {
178    validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace");
179  }
180
181  @Test
182  public void testLockRequestValidationRegionsFromDifferentTable() throws Exception {
183    List<RegionInfo> regions = new ArrayList<>();
184    regions.addAll(tableRegions1);
185    regions.addAll(tableRegions2);
186    validateLockRequestException(getRegionLock(regions, "desc"),
187        "All regions should be from same table");
188  }
189
190  /**
191   * Returns immediately if the lock is acquired.
192   * @throws TimeoutException if lock couldn't be acquired.
193   */
194  private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception {
195    long deadline = System.currentTimeMillis() + timeoutInMs;
196    while (System.currentTimeMillis() < deadline) {
197      LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
198          LockHeartbeatRequest.newBuilder().setProcId(procId).build());
199      if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
200        assertEquals(HEARTBEAT_TIMEOUT, response.getTimeoutMs());
201        LOG.debug(String.format("Proc id %s acquired lock.", procId));
202        return true;
203      }
204      Thread.sleep(100);
205    }
206    return false;
207  }
208
209  private long queueLock(LockRequest lockRequest) throws ServiceException {
210    LockResponse response = masterRpcService.requestLock(null, lockRequest);
211    return response.getProcId();
212  }
213
214  private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException {
215    LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
216        LockHeartbeatRequest.newBuilder().setProcId(procId).build());
217    if (isLocked) {
218      assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus());
219    } else {
220      assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus());
221    }
222    LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus()));
223  }
224
225  private void releaseLock(long procId) throws ServiceException {
226    masterRpcService.lockHeartbeat(null,
227        LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
228  }
229
230  @Test
231  public void testUpdateHeartbeatAndUnlockForTable() throws Exception {
232    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
233    final long procId = queueLock(lock);
234    assertTrue(awaitForLocked(procId, 2000));
235    Thread.sleep(HEARTBEAT_TIMEOUT /2);
236    sendHeartbeatAndCheckLocked(procId, true);
237    Thread.sleep(HEARTBEAT_TIMEOUT /2);
238    sendHeartbeatAndCheckLocked(procId, true);
239    Thread.sleep(HEARTBEAT_TIMEOUT /2);
240    sendHeartbeatAndCheckLocked(procId, true);
241    releaseLock(procId);
242    sendHeartbeatAndCheckLocked(procId, false);
243    ProcedureTestingUtility.waitProcedure(procExec, procId);
244    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
245  }
246
247  @Test
248  public void testAbort() throws Exception {
249    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
250    final long procId = queueLock(lock);
251    assertTrue(awaitForLocked(procId, 2000));
252    assertTrue(procExec.abort(procId));
253    sendHeartbeatAndCheckLocked(procId, false);
254    ProcedureTestingUtility.waitProcedure(procExec, procId);
255    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
256  }
257
258  @Test
259  public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception {
260    LockRequest lock = getNamespaceLock(namespace, testMethodName);
261    final long procId = queueLock(lock);
262    assertTrue(awaitForLocked(procId, 2000));
263    Thread.sleep(HEARTBEAT_TIMEOUT /2);
264    sendHeartbeatAndCheckLocked(procId, true);
265    Thread.sleep(HEARTBEAT_TIMEOUT /2);
266    sendHeartbeatAndCheckLocked(procId, true);
267    Thread.sleep(HEARTBEAT_TIMEOUT /2);
268    sendHeartbeatAndCheckLocked(procId, true);
269    releaseLock(procId);
270    sendHeartbeatAndCheckLocked(procId, false);
271    ProcedureTestingUtility.waitProcedure(procExec, procId);
272    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
273  }
274
275  @Test
276  public void testTimeout() throws Exception {
277    LockRequest lock = getNamespaceLock(namespace, testMethodName);
278    final long procId = queueLock(lock);
279    assertTrue(awaitForLocked(procId, 2000));
280    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
281    sendHeartbeatAndCheckLocked(procId, true);
282    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
283    sendHeartbeatAndCheckLocked(procId, true);
284    Thread.sleep(4 * HEARTBEAT_TIMEOUT);
285    sendHeartbeatAndCheckLocked(procId, false);
286    ProcedureTestingUtility.waitProcedure(procExec, procId);
287    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
288  }
289
290  @Test
291  public void testMultipleLocks() throws Exception {
292    LockRequest nsLock = getNamespaceLock(namespace, testMethodName);
293    LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName);
294    LockRequest tableLock2 =  getTableExclusiveLock(tableName2, testMethodName);
295    LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName);
296    LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName);
297    // Acquire namespace lock, then queue other locks.
298    long nsProcId = queueLock(nsLock);
299    assertTrue(awaitForLocked(nsProcId, 2000));
300    long start = System.currentTimeMillis();
301    sendHeartbeatAndCheckLocked(nsProcId, true);
302    long table1ProcId = queueLock(tableLock1);
303    long table2ProcId = queueLock(tableLock2);
304    long regions1ProcId = queueLock(regionsLock1);
305    long regions2ProcId = queueLock(regionsLock2);
306
307    // Assert tables & region locks are waiting because of namespace lock.
308    long now = System.currentTimeMillis();
309    // leave extra 10 msec in case more than half the HEARTBEAT_TIMEOUT has passed
310    Thread.sleep(Math.min(HEARTBEAT_TIMEOUT / 2, Math.max(HEARTBEAT_TIMEOUT-(now-start)-10, 0)));
311    sendHeartbeatAndCheckLocked(nsProcId, true);
312    sendHeartbeatAndCheckLocked(table1ProcId, false);
313    sendHeartbeatAndCheckLocked(table2ProcId, false);
314    sendHeartbeatAndCheckLocked(regions1ProcId, false);
315    sendHeartbeatAndCheckLocked(regions2ProcId, false);
316
317    // Release namespace lock and assert tables locks are acquired but not region lock
318    releaseLock(nsProcId);
319    assertTrue(awaitForLocked(table1ProcId, 2000));
320    assertTrue(awaitForLocked(table2ProcId, 2000));
321    sendHeartbeatAndCheckLocked(regions1ProcId, false);
322    sendHeartbeatAndCheckLocked(regions2ProcId, false);
323
324    // Release table1 lock and assert region lock is acquired.
325    releaseLock(table1ProcId);
326    sendHeartbeatAndCheckLocked(table1ProcId, false);
327    assertTrue(awaitForLocked(regions1ProcId, 2000));
328    sendHeartbeatAndCheckLocked(table2ProcId, true);
329    sendHeartbeatAndCheckLocked(regions2ProcId, false);
330
331    // Release table2 lock and assert region lock is acquired.
332    releaseLock(table2ProcId);
333    sendHeartbeatAndCheckLocked(table2ProcId, false);
334    assertTrue(awaitForLocked(regions2ProcId, 2000));
335    sendHeartbeatAndCheckLocked(regions1ProcId, true);
336    sendHeartbeatAndCheckLocked(regions2ProcId, true);
337
338    // Release region locks.
339    releaseLock(regions1ProcId);
340    releaseLock(regions2ProcId);
341    sendHeartbeatAndCheckLocked(regions1ProcId, false);
342    sendHeartbeatAndCheckLocked(regions2ProcId, false);
343    ProcedureTestingUtility.waitAllProcedures(procExec);
344    ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId);
345    ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId);
346    ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId);
347    ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId);
348    ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId);
349  }
350
351  // Test latch is decreased in count when lock is acquired.
352  @Test
353  public void testLatch() throws Exception {
354    CountDownLatch latch = new CountDownLatch(1);
355    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
356    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
357        TableName.valueOf("table"),
358        org.apache.hadoop.hbase.procedure2.LockType.EXCLUSIVE, "desc", latch);
359    procExec.submitProcedure(lockProc);
360    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
361    releaseLock(lockProc.getProcId());
362    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
363    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
364  }
365
366  // LockProcedures with latch are considered local locks.
367  @Test
368  public void testLocalLockTimeout() throws Exception {
369    CountDownLatch latch = new CountDownLatch(1);
370    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
371    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
372        TableName.valueOf("table"), LockType.EXCLUSIVE, "desc", latch);
373    procExec.submitProcedure(lockProc);
374    assertTrue(awaitForLocked(lockProc.getProcId(), 2000));
375    Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2);
376    assertTrue(lockProc.isLocked());
377    Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT);
378    assertFalse(lockProc.isLocked());
379    releaseLock(lockProc.getProcId());
380    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
381    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
382  }
383
384  private void testRemoteLockRecovery(LockRequest lock) throws Exception {
385    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
386    final long procId = queueLock(lock);
387    assertTrue(awaitForLocked(procId, 2000));
388
389    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
390    ProcedureTestingUtility.waitProcedure(procExec, procId);
391    assertEquals(false, procExec.isRunning());
392    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
393    ProcedureTestingUtility.restart(procExec);
394    while (!procExec.isStarted(procId)) {
395      Thread.sleep(250);
396    }
397    assertEquals(true, procExec.isRunning());
398
399    // After recovery, remote locks should reacquire locks and function normally.
400    assertTrue(awaitForLocked(procId, 2000));
401    Thread.sleep(HEARTBEAT_TIMEOUT/2);
402    sendHeartbeatAndCheckLocked(procId, true);
403    Thread.sleep(HEARTBEAT_TIMEOUT/2);
404    sendHeartbeatAndCheckLocked(procId, true);
405    Thread.sleep(2 * HEARTBEAT_TIMEOUT + HEARTBEAT_TIMEOUT/2);
406    sendHeartbeatAndCheckLocked(procId, false);
407    ProcedureTestingUtility.waitProcedure(procExec, procId);
408    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
409  }
410
411  @Test
412  public void testRemoteTableLockRecovery() throws Exception {
413    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
414    testRemoteLockRecovery(lock);
415  }
416
417  @Test
418  public void testRemoteNamespaceLockRecovery() throws Exception {
419    LockRequest lock = getNamespaceLock(namespace, testMethodName);
420    testRemoteLockRecovery(lock);
421  }
422
423  @Test
424  public void testRemoteRegionLockRecovery() throws Exception {
425    LockRequest lock = getRegionLock(tableRegions1, testMethodName);
426    testRemoteLockRecovery(lock);
427  }
428
429  @Test
430  public void testLocalMasterLockRecovery() throws Exception {
431    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
432    CountDownLatch latch = new CountDownLatch(1);
433    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
434        TableName.valueOf("table"), LockType.EXCLUSIVE, "desc", latch);
435    procExec.submitProcedure(lockProc);
436    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
437
438    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
439    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
440    assertEquals(false, procExec.isRunning());
441    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
442    // remove zk lock node otherwise recovered lock will keep waiting on it.
443    ProcedureTestingUtility.restart(procExec);
444    while (!procExec.isStarted(lockProc.getProcId())) {
445      Thread.sleep(250);
446    }
447    assertEquals(true, procExec.isRunning());
448    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
449    Procedure<?> result = procExec.getResultOrProcedure(lockProc.getProcId());
450    assertTrue(result != null && !result.isFailed());
451    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
452  }
453}