001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.locking;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.locking.LockServiceClient;
038import org.apache.hadoop.hbase.master.MasterRpcServices;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
040import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
041import org.apache.hadoop.hbase.procedure2.LockType;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.hamcrest.core.IsInstanceOf;
049import org.hamcrest.core.StringStartsWith;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.ExpectedException;
059import org.junit.rules.TestName;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
070
071@Category({MasterTests.class, MediumTests.class})
072public class TestLockProcedure {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestLockProcedure.class);
077
078  @Rule
079  public final ExpectedException exception = ExpectedException.none();
080  @Rule
081  public TestName testName = new TestName();
082  // crank this up if this test turns out to be flaky.
083  private static final int HEARTBEAT_TIMEOUT = 2000;
084  private static final int LOCAL_LOCKS_TIMEOUT = 4000;
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestLockProcedure.class);
087  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
088  private static MasterRpcServices masterRpcService;
089  private static ProcedureExecutor<MasterProcedureEnv> procExec;
090
091  private static String namespace = "namespace";
092  private static TableName tableName1 = TableName.valueOf(namespace, "table1");
093  private static List<RegionInfo> tableRegions1;
094  private static TableName tableName2 = TableName.valueOf(namespace, "table2");
095  private static List<RegionInfo> tableRegions2;
096
097  private String testMethodName;
098
099  private static void setupConf(Configuration conf) {
100    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
101    conf.setBoolean("hbase.procedure.check.owner.set", false);  // since rpc user will be null
102    conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
103    conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
104  }
105
106  @BeforeClass
107  public static void setupCluster() throws Exception {
108    setupConf(UTIL.getConfiguration());
109    UTIL.startMiniCluster(1);
110    UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
111    UTIL.createTable(tableName1,
112        new byte[][]{ Bytes.toBytes("fam")}, new byte[][] {Bytes.toBytes("1")});
113    UTIL.createTable(tableName2,
114        new byte[][]{Bytes.toBytes("fam")}, new byte[][] {Bytes.toBytes("1")});
115    masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices();
116    procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
117    tableRegions1 = UTIL.getAdmin().getRegions(tableName1);
118    tableRegions2 = UTIL.getAdmin().getRegions(tableName2);
119    assert tableRegions1.size() > 0;
120    assert tableRegions2.size() > 0;
121  }
122
123  @AfterClass
124  public static void cleanupTest() throws Exception {
125    try {
126      UTIL.shutdownMiniCluster();
127    } catch (Exception e) {
128      LOG.warn("failure shutting down cluster", e);
129    }
130  }
131
132  @Before
133  public void setup() throws Exception {
134    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
135    testMethodName = testName.getMethodName();
136  }
137
138  @After
139  public void tearDown() throws Exception {
140    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
141    // Kill all running procedures.
142    for (Procedure<?> proc : procExec.getProcedures()) {
143      procExec.abort(proc.getProcId());
144      ProcedureTestingUtility.waitProcedure(procExec, proc);
145    }
146    assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size());
147  }
148
149  private LockRequest getNamespaceLock(String namespace, String description) {
150    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
151        namespace, null, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
152  }
153
154  private LockRequest getTableExclusiveLock(TableName tableName, String description) {
155    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
156        null, tableName, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
157  }
158
159  private LockRequest getRegionLock(List<RegionInfo> regionInfos, String description) {
160    return LockServiceClient.buildLockRequest(LockServiceProtos.LockType.EXCLUSIVE,
161        null, null, regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
162  }
163
164  private void validateLockRequestException(LockRequest lockRequest, String message)
165      throws Exception {
166    exception.expect(ServiceException.class);
167    exception.expectCause(IsInstanceOf.instanceOf(DoNotRetryIOException.class));
168    exception.expectMessage(
169        StringStartsWith.startsWith("org.apache.hadoop.hbase.DoNotRetryIOException: "
170            + "java.lang.IllegalArgumentException: " + message));
171    masterRpcService.requestLock(null, lockRequest);
172  }
173
174  @Test
175  public void testLockRequestValidationEmptyDescription() throws Exception {
176    validateLockRequestException(getNamespaceLock("", ""), "Empty description");
177  }
178
179  @Test
180  public void testLockRequestValidationEmptyNamespaceName() throws Exception {
181    validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace");
182  }
183
184  @Test
185  public void testLockRequestValidationRegionsFromDifferentTable() throws Exception {
186    List<RegionInfo> regions = new ArrayList<>();
187    regions.addAll(tableRegions1);
188    regions.addAll(tableRegions2);
189    validateLockRequestException(getRegionLock(regions, "desc"),
190        "All regions should be from same table");
191  }
192
193  /**
194   * Returns immediately if the lock is acquired.
195   * @throws TimeoutException if lock couldn't be acquired.
196   */
197  private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception {
198    long deadline = System.currentTimeMillis() + timeoutInMs;
199    while (System.currentTimeMillis() < deadline) {
200      LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
201          LockHeartbeatRequest.newBuilder().setProcId(procId).build());
202      if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
203        assertEquals(HEARTBEAT_TIMEOUT, response.getTimeoutMs());
204        LOG.debug(String.format("Proc id %s acquired lock.", procId));
205        return true;
206      }
207      Thread.sleep(100);
208    }
209    return false;
210  }
211
212  private long queueLock(LockRequest lockRequest) throws ServiceException {
213    LockResponse response = masterRpcService.requestLock(null, lockRequest);
214    return response.getProcId();
215  }
216
217  private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException {
218    LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
219        LockHeartbeatRequest.newBuilder().setProcId(procId).build());
220    if (isLocked) {
221      assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus());
222    } else {
223      assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus());
224    }
225    LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus()));
226  }
227
228  private void releaseLock(long procId) throws ServiceException {
229    masterRpcService.lockHeartbeat(null,
230        LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
231  }
232
233  @Test
234  public void testUpdateHeartbeatAndUnlockForTable() throws Exception {
235    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
236    final long procId = queueLock(lock);
237    assertTrue(awaitForLocked(procId, 2000));
238    Thread.sleep(HEARTBEAT_TIMEOUT /2);
239    sendHeartbeatAndCheckLocked(procId, true);
240    Thread.sleep(HEARTBEAT_TIMEOUT /2);
241    sendHeartbeatAndCheckLocked(procId, true);
242    Thread.sleep(HEARTBEAT_TIMEOUT /2);
243    sendHeartbeatAndCheckLocked(procId, true);
244    releaseLock(procId);
245    sendHeartbeatAndCheckLocked(procId, false);
246    ProcedureTestingUtility.waitProcedure(procExec, procId);
247    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
248  }
249
250  @Test
251  public void testAbort() throws Exception {
252    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
253    final long procId = queueLock(lock);
254    assertTrue(awaitForLocked(procId, 2000));
255    assertTrue(procExec.abort(procId));
256    sendHeartbeatAndCheckLocked(procId, false);
257    ProcedureTestingUtility.waitProcedure(procExec, procId);
258    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
259  }
260
261  @Test
262  public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception {
263    LockRequest lock = getNamespaceLock(namespace, testMethodName);
264    final long procId = queueLock(lock);
265    assertTrue(awaitForLocked(procId, 2000));
266    Thread.sleep(HEARTBEAT_TIMEOUT /2);
267    sendHeartbeatAndCheckLocked(procId, true);
268    Thread.sleep(HEARTBEAT_TIMEOUT /2);
269    sendHeartbeatAndCheckLocked(procId, true);
270    Thread.sleep(HEARTBEAT_TIMEOUT /2);
271    sendHeartbeatAndCheckLocked(procId, true);
272    releaseLock(procId);
273    sendHeartbeatAndCheckLocked(procId, false);
274    ProcedureTestingUtility.waitProcedure(procExec, procId);
275    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
276  }
277
278  @Test
279  public void testTimeout() throws Exception {
280    LockRequest lock = getNamespaceLock(namespace, testMethodName);
281    final long procId = queueLock(lock);
282    assertTrue(awaitForLocked(procId, 2000));
283    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
284    sendHeartbeatAndCheckLocked(procId, true);
285    Thread.sleep(HEARTBEAT_TIMEOUT / 2);
286    sendHeartbeatAndCheckLocked(procId, true);
287    Thread.sleep(4 * HEARTBEAT_TIMEOUT);
288    sendHeartbeatAndCheckLocked(procId, false);
289    ProcedureTestingUtility.waitProcedure(procExec, procId);
290    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
291  }
292
293  @Test
294  public void testMultipleLocks() throws Exception {
295    LockRequest nsLock = getNamespaceLock(namespace, testMethodName);
296    LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName);
297    LockRequest tableLock2 =  getTableExclusiveLock(tableName2, testMethodName);
298    LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName);
299    LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName);
300    // Acquire namespace lock, then queue other locks.
301    long nsProcId = queueLock(nsLock);
302    assertTrue(awaitForLocked(nsProcId, 2000));
303    long start = System.currentTimeMillis();
304    sendHeartbeatAndCheckLocked(nsProcId, true);
305    long table1ProcId = queueLock(tableLock1);
306    long table2ProcId = queueLock(tableLock2);
307    long regions1ProcId = queueLock(regionsLock1);
308    long regions2ProcId = queueLock(regionsLock2);
309
310    // Assert tables & region locks are waiting because of namespace lock.
311    long now = System.currentTimeMillis();
312    // leave extra 10 msec in case more than half the HEARTBEAT_TIMEOUT has passed
313    Thread.sleep(Math.min(HEARTBEAT_TIMEOUT / 2, Math.max(HEARTBEAT_TIMEOUT-(now-start)-10, 0)));
314    sendHeartbeatAndCheckLocked(nsProcId, true);
315    sendHeartbeatAndCheckLocked(table1ProcId, false);
316    sendHeartbeatAndCheckLocked(table2ProcId, false);
317    sendHeartbeatAndCheckLocked(regions1ProcId, false);
318    sendHeartbeatAndCheckLocked(regions2ProcId, false);
319
320    // Release namespace lock and assert tables locks are acquired but not region lock
321    releaseLock(nsProcId);
322    assertTrue(awaitForLocked(table1ProcId, 2000));
323    assertTrue(awaitForLocked(table2ProcId, 2000));
324    sendHeartbeatAndCheckLocked(regions1ProcId, false);
325    sendHeartbeatAndCheckLocked(regions2ProcId, false);
326
327    // Release table1 lock and assert region lock is acquired.
328    releaseLock(table1ProcId);
329    sendHeartbeatAndCheckLocked(table1ProcId, false);
330    assertTrue(awaitForLocked(regions1ProcId, 2000));
331    sendHeartbeatAndCheckLocked(table2ProcId, true);
332    sendHeartbeatAndCheckLocked(regions2ProcId, false);
333
334    // Release table2 lock and assert region lock is acquired.
335    releaseLock(table2ProcId);
336    sendHeartbeatAndCheckLocked(table2ProcId, false);
337    assertTrue(awaitForLocked(regions2ProcId, 2000));
338    sendHeartbeatAndCheckLocked(regions1ProcId, true);
339    sendHeartbeatAndCheckLocked(regions2ProcId, true);
340
341    // Release region locks.
342    releaseLock(regions1ProcId);
343    releaseLock(regions2ProcId);
344    sendHeartbeatAndCheckLocked(regions1ProcId, false);
345    sendHeartbeatAndCheckLocked(regions2ProcId, false);
346    ProcedureTestingUtility.waitAllProcedures(procExec);
347    ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId);
348    ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId);
349    ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId);
350    ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId);
351    ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId);
352  }
353
354  // Test latch is decreased in count when lock is acquired.
355  @Test
356  public void testLatch() throws Exception {
357    CountDownLatch latch = new CountDownLatch(1);
358    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
359    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
360        TableName.valueOf("table"),
361        org.apache.hadoop.hbase.procedure2.LockType.EXCLUSIVE, "desc", latch);
362    procExec.submitProcedure(lockProc);
363    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
364    releaseLock(lockProc.getProcId());
365    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
366    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
367  }
368
369  // LockProcedures with latch are considered local locks.
370  @Test
371  public void testLocalLockTimeout() throws Exception {
372    CountDownLatch latch = new CountDownLatch(1);
373    // MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
374    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
375        TableName.valueOf("table"), LockType.EXCLUSIVE, "desc", latch);
376    procExec.submitProcedure(lockProc);
377    assertTrue(awaitForLocked(lockProc.getProcId(), 2000));
378    Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2);
379    assertTrue(lockProc.isLocked());
380    Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT);
381    assertFalse(lockProc.isLocked());
382    releaseLock(lockProc.getProcId());
383    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
384    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
385  }
386
387  private void testRemoteLockRecovery(LockRequest lock) throws Exception {
388    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
389    final long procId = queueLock(lock);
390    assertTrue(awaitForLocked(procId, 2000));
391
392    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
393    ProcedureTestingUtility.waitProcedure(procExec, procId);
394    assertEquals(false, procExec.isRunning());
395    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
396    ProcedureTestingUtility.restart(procExec);
397    while (!procExec.isStarted(procId)) {
398      Thread.sleep(250);
399    }
400    assertEquals(true, procExec.isRunning());
401
402    // After recovery, remote locks should reacquire locks and function normally.
403    assertTrue(awaitForLocked(procId, 2000));
404    Thread.sleep(HEARTBEAT_TIMEOUT/2);
405    sendHeartbeatAndCheckLocked(procId, true);
406    Thread.sleep(HEARTBEAT_TIMEOUT/2);
407    sendHeartbeatAndCheckLocked(procId, true);
408    Thread.sleep(2 * HEARTBEAT_TIMEOUT + HEARTBEAT_TIMEOUT/2);
409    sendHeartbeatAndCheckLocked(procId, false);
410    ProcedureTestingUtility.waitProcedure(procExec, procId);
411    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
412  }
413
414  @Test
415  public void testRemoteTableLockRecovery() throws Exception {
416    LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
417    testRemoteLockRecovery(lock);
418  }
419
420  @Test
421  public void testRemoteNamespaceLockRecovery() throws Exception {
422    LockRequest lock = getNamespaceLock(namespace, testMethodName);
423    testRemoteLockRecovery(lock);
424  }
425
426  @Test
427  public void testRemoteRegionLockRecovery() throws Exception {
428    LockRequest lock = getRegionLock(tableRegions1, testMethodName);
429    testRemoteLockRecovery(lock);
430  }
431
432  @Test
433  public void testLocalMasterLockRecovery() throws Exception {
434    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
435    CountDownLatch latch = new CountDownLatch(1);
436    LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
437        TableName.valueOf("table"), LockType.EXCLUSIVE, "desc", latch);
438    procExec.submitProcedure(lockProc);
439    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
440
441    // wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
442    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
443    assertEquals(false, procExec.isRunning());
444    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
445    // remove zk lock node otherwise recovered lock will keep waiting on it.
446    ProcedureTestingUtility.restart(procExec);
447    while (!procExec.isStarted(lockProc.getProcId())) {
448      Thread.sleep(250);
449    }
450    assertEquals(true, procExec.isRunning());
451    ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
452    Procedure<?> result = procExec.getResultOrProcedure(lockProc.getProcId());
453    assertTrue(result != null && !result.isFailed());
454    ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
455  }
456}