001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.greaterThanOrEqualTo; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertThrows; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.verify; 028 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.RegionInfoBuilder; 034import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 035import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 036import org.apache.hadoop.hbase.testclassification.MasterTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.apache.hadoop.hbase.util.AtomicUtils; 039import org.apache.hadoop.hbase.util.Threads; 040import org.junit.jupiter.api.BeforeEach; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043 044@Tag(MasterTests.TAG) 045@Tag(SmallTests.TAG) 046public class TestRegionStateNodeLock { 047 048 private final RegionInfo regionInfo = 049 RegionInfoBuilder.newBuilder(TableName.valueOf("test")).build(); 050 051 private RegionStateNodeLock lock; 052 053 @BeforeEach 054 public void setUp() { 055 lock = new RegionStateNodeLock(regionInfo); 056 } 057 058 @Test 059 public void testLockByThread() { 060 assertFalse(lock.isLocked()); 061 assertFalse(lock.isLockedBy(Thread.currentThread())); 062 assertThrows(IllegalMonitorStateException.class, () -> lock.unlock()); 063 lock.lock(); 064 assertTrue(lock.isLocked()); 065 assertTrue(lock.isLockedBy(Thread.currentThread())); 066 assertFalse(lock.isLockedBy(new Object())); 067 // reentrant 068 assertTrue(lock.tryLock()); 069 lock.unlock(); 070 assertTrue(lock.isLocked()); 071 lock.unlock(); 072 assertFalse(lock.isLocked()); 073 } 074 075 @Test 076 public void testLockByProc() throws ProcedureSuspendedException { 077 NoopProcedure<?> proc = new NoopProcedure<Void>(); 078 assertFalse(lock.isLocked()); 079 assertFalse(lock.isLockedBy(proc)); 080 assertThrows(IllegalMonitorStateException.class, () -> lock.unlock(proc)); 081 // here we do not need wake up 082 lock.lock(proc, null); 083 assertTrue(lock.isLocked()); 084 assertTrue(lock.isLockedBy(proc)); 085 // reentrant 086 assertTrue(lock.tryLock(proc)); 087 lock.unlock(proc); 088 assertTrue(lock.isLocked()); 089 assertTrue(lock.isLockedBy(proc)); 090 lock.unlock(proc); 091 assertFalse(lock.isLocked()); 092 assertFalse(lock.isLockedBy(proc)); 093 } 094 095 @Test 096 public void testLockProcThenThread() throws ProcedureSuspendedException { 097 NoopProcedure<?> proc = new NoopProcedure<Void>(); 098 assertFalse(lock.isLocked()); 099 lock.lock(proc, null); 100 assertFalse(lock.tryLock()); 101 assertThrows(IllegalMonitorStateException.class, () -> lock.unlock()); 102 long startNs = System.nanoTime(); 103 new Thread(() -> { 104 Threads.sleepWithoutInterrupt(2000); 105 lock.unlock(proc); 106 }).start(); 107 lock.lock(); 108 long costNs = System.nanoTime() - startNs; 109 assertThat(TimeUnit.NANOSECONDS.toMillis(costNs), greaterThanOrEqualTo(1800L)); 110 assertTrue(lock.isLocked()); 111 lock.unlock(); 112 assertFalse(lock.isLocked()); 113 } 114 115 @Test 116 public void testLockThreadThenProc() throws ProcedureSuspendedException { 117 lock.lock(); 118 NoopProcedure<?> proc = new NoopProcedure<Void>(); 119 Runnable wakeUp = mock(Runnable.class); 120 assertThrows(ProcedureSuspendedException.class, () -> lock.lock(proc, wakeUp)); 121 lock.unlock(); 122 // make sure that we have woken up the procedure, and the lock has been passed 123 verify(wakeUp).run(); 124 assertTrue(lock.isLockedBy(proc)); 125 } 126 127 @Test 128 public void testLockMultiThread() throws InterruptedException { 129 int nThreads = 10; 130 AtomicLong concurrency = new AtomicLong(0); 131 AtomicLong maxConcurrency = new AtomicLong(0); 132 Thread[] threads = new Thread[nThreads]; 133 for (int i = 0; i < nThreads; i++) { 134 threads[i] = new Thread(() -> { 135 for (int j = 0; j < 1000; j++) { 136 lock.lock(); 137 try { 138 long c = concurrency.incrementAndGet(); 139 AtomicUtils.updateMax(maxConcurrency, c); 140 concurrency.decrementAndGet(); 141 } finally { 142 lock.unlock(); 143 } 144 Threads.sleepWithoutInterrupt(1); 145 } 146 }); 147 } 148 for (Thread t : threads) { 149 t.start(); 150 } 151 for (Thread t : threads) { 152 t.join(); 153 } 154 assertEquals(0, concurrency.get()); 155 assertEquals(1, maxConcurrency.get()); 156 assertFalse(lock.isLocked()); 157 } 158}