001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 029import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; 030import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; 031import org.apache.hadoop.hbase.procedure2.Procedure; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.junit.jupiter.api.AfterEach; 035import org.junit.jupiter.api.BeforeEach; 036import org.junit.jupiter.api.Tag; 037import org.junit.jupiter.api.Test; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Tag(MasterTests.TAG) 042@Tag(MediumTests.TAG) 043public class TestMasterProcedureSchedulerConcurrency { 044 045 private static final Logger LOG = 046 LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class); 047 048 private MasterProcedureScheduler queue; 049 050 @BeforeEach 051 public void setUp() throws IOException { 052 queue = new MasterProcedureScheduler(pid -> null); 053 queue.start(); 054 } 055 056 @AfterEach 057 public void tearDown() throws IOException { 058 assertEquals(0, queue.size(), "proc-queue expected to be empty"); 059 queue.stop(); 060 queue.clear(); 061 } 062 063 @Test 064 public void testConcurrentPeerOperations() throws Exception { 065 TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue); 066 067 int NUM_ITEMS = 10; 068 int NUM_PEERS = 5; 069 AtomicInteger opsCount = new AtomicInteger(0); 070 for (int i = 0; i < NUM_PEERS; ++i) { 071 String peerId = String.format("test-peer-%04d", i); 072 for (int j = 1; j < NUM_ITEMS; ++j) { 073 procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD)); 074 opsCount.incrementAndGet(); 075 } 076 } 077 assertEquals(opsCount.get(), queue.size()); 078 079 Thread[] threads = new Thread[NUM_PEERS * 2]; 080 HashSet<String> concurrentPeers = new HashSet<>(); 081 ArrayList<String> failures = new ArrayList<>(); 082 AtomicInteger concurrentCount = new AtomicInteger(0); 083 for (int i = 0; i < threads.length; ++i) { 084 threads[i] = new Thread() { 085 @Override 086 public void run() { 087 while (opsCount.get() > 0) { 088 try { 089 TestPeerProcedure proc = procSet.acquire(); 090 if (proc == null) { 091 queue.signalAll(); 092 if (opsCount.get() > 0) { 093 continue; 094 } 095 break; 096 } 097 098 String peerId = proc.getPeerId(); 099 synchronized (concurrentPeers) { 100 assertTrue(concurrentPeers.add(peerId), "unexpected concurrency on " + peerId); 101 } 102 assertTrue(opsCount.decrementAndGet() >= 0); 103 104 try { 105 long procId = proc.getProcId(); 106 int concurrent = concurrentCount.incrementAndGet(); 107 assertTrue(concurrent >= 1 && concurrent <= NUM_PEERS, 108 "inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_PEERS); 109 LOG.debug( 110 "[S] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent); 111 Thread.sleep(2000); 112 concurrent = concurrentCount.decrementAndGet(); 113 LOG.debug( 114 "[E] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent); 115 assertTrue(concurrent < NUM_PEERS, "dec-concurrent=" + concurrent); 116 } finally { 117 synchronized (concurrentPeers) { 118 assertTrue(concurrentPeers.remove(peerId)); 119 } 120 procSet.release(proc); 121 } 122 } catch (Throwable e) { 123 LOG.error("Failed " + e.getMessage(), e); 124 synchronized (failures) { 125 failures.add(e.getMessage()); 126 } 127 } finally { 128 queue.signalAll(); 129 } 130 } 131 } 132 }; 133 threads[i].start(); 134 } 135 136 for (int i = 0; i < threads.length; ++i) { 137 threads[i].join(); 138 } 139 assertTrue(failures.isEmpty(), failures.toString()); 140 assertEquals(0, opsCount.get()); 141 assertEquals(0, queue.size()); 142 } 143 144 /** 145 * Verify that "write" operations for a single table are serialized, but different tables can be 146 * executed in parallel. 147 */ 148 @Test 149 public void testConcurrentWriteOps() throws Exception { 150 final TestTableProcSet procSet = new TestTableProcSet(queue); 151 152 final int NUM_ITEMS = 10; 153 final int NUM_TABLES = 4; 154 final AtomicInteger opsCount = new AtomicInteger(0); 155 for (int i = 0; i < NUM_TABLES; ++i) { 156 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); 157 for (int j = 1; j < NUM_ITEMS; ++j) { 158 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, 159 TableProcedureInterface.TableOperationType.EDIT)); 160 opsCount.incrementAndGet(); 161 } 162 } 163 assertEquals(opsCount.get(), queue.size()); 164 165 final Thread[] threads = new Thread[NUM_TABLES * 2]; 166 final HashSet<TableName> concurrentTables = new HashSet<>(); 167 final ArrayList<String> failures = new ArrayList<>(); 168 final AtomicInteger concurrentCount = new AtomicInteger(0); 169 for (int i = 0; i < threads.length; ++i) { 170 threads[i] = new Thread() { 171 @Override 172 public void run() { 173 while (opsCount.get() > 0) { 174 try { 175 Procedure proc = procSet.acquire(); 176 if (proc == null) { 177 queue.signalAll(); 178 if (opsCount.get() > 0) { 179 continue; 180 } 181 break; 182 } 183 184 TableName tableId = procSet.getTableName(proc); 185 synchronized (concurrentTables) { 186 assertTrue(concurrentTables.add(tableId), "unexpected concurrency on " + tableId); 187 } 188 assertTrue(opsCount.decrementAndGet() >= 0); 189 try { 190 long procId = proc.getProcId(); 191 int concurrent = concurrentCount.incrementAndGet(); 192 assertTrue(concurrent >= 1 && concurrent <= NUM_TABLES, 193 "inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_TABLES); 194 LOG.debug( 195 "[S] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent); 196 Thread.sleep(2000); 197 concurrent = concurrentCount.decrementAndGet(); 198 LOG.debug( 199 "[E] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent); 200 assertTrue(concurrent < NUM_TABLES, "dec-concurrent=" + concurrent); 201 } finally { 202 synchronized (concurrentTables) { 203 assertTrue(concurrentTables.remove(tableId)); 204 } 205 procSet.release(proc); 206 } 207 } catch (Throwable e) { 208 LOG.error("Failed " + e.getMessage(), e); 209 synchronized (failures) { 210 failures.add(e.getMessage()); 211 } 212 } finally { 213 queue.signalAll(); 214 } 215 } 216 } 217 }; 218 threads[i].start(); 219 } 220 for (int i = 0; i < threads.length; ++i) { 221 threads[i].join(); 222 } 223 assertTrue(failures.isEmpty(), failures.toString()); 224 assertEquals(0, opsCount.get()); 225 assertEquals(0, queue.size()); 226 227 for (int i = 1; i <= NUM_TABLES; ++i) { 228 final TableName table = TableName.valueOf(String.format("testtb-%04d", i)); 229 final TestTableProcedure dummyProc = 230 new TestTableProcedure(100, table, TableProcedureInterface.TableOperationType.DELETE); 231 assertTrue(queue.markTableAsDeleted(table, dummyProc), 232 "queue should be deleted, table=" + table); 233 } 234 } 235 236 @Test 237 public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception { 238 // Make sure the tool does not get stuck 239 MasterProcedureSchedulerPerformanceEvaluation.main(new String[] { "-num_ops", "1000" }); 240 } 241 242 public static class TestTableProcSet { 243 private final MasterProcedureScheduler queue; 244 245 public TestTableProcSet(final MasterProcedureScheduler queue) { 246 this.queue = queue; 247 } 248 249 public void addBack(Procedure proc) { 250 queue.addBack(proc); 251 } 252 253 public void addFront(Procedure proc) { 254 queue.addFront(proc); 255 } 256 257 public Procedure acquire() { 258 Procedure proc = null; 259 boolean waiting = true; 260 while (waiting && queue.size() > 0) { 261 proc = queue.poll(100000000L); 262 if (proc == null) { 263 continue; 264 } 265 switch (getTableOperationType(proc)) { 266 case CREATE: 267 case DELETE: 268 case EDIT: 269 waiting = queue.waitTableExclusiveLock(proc, getTableName(proc)); 270 break; 271 case READ: 272 waiting = queue.waitTableSharedLock(proc, getTableName(proc)); 273 break; 274 default: 275 throw new UnsupportedOperationException(); 276 } 277 } 278 return proc; 279 } 280 281 public void release(Procedure proc) { 282 switch (getTableOperationType(proc)) { 283 case CREATE: 284 case DELETE: 285 case EDIT: 286 queue.wakeTableExclusiveLock(proc, getTableName(proc)); 287 break; 288 case READ: 289 queue.wakeTableSharedLock(proc, getTableName(proc)); 290 break; 291 default: 292 throw new UnsupportedOperationException(); 293 } 294 queue.completionCleanup(proc); 295 } 296 297 public TableName getTableName(Procedure proc) { 298 return ((TableProcedureInterface) proc).getTableName(); 299 } 300 301 public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { 302 return ((TableProcedureInterface) proc).getTableOperationType(); 303 } 304 } 305 306 public static class TestPeerProcedureSet { 307 private final MasterProcedureScheduler queue; 308 309 public TestPeerProcedureSet(final MasterProcedureScheduler queue) { 310 this.queue = queue; 311 } 312 313 public void addBack(TestPeerProcedure proc) { 314 queue.addBack(proc); 315 } 316 317 public TestPeerProcedure acquire() { 318 TestPeerProcedure proc = null; 319 boolean waiting = true; 320 while (waiting && queue.size() > 0) { 321 proc = (TestPeerProcedure) queue.poll(100000000L); 322 if (proc == null) { 323 continue; 324 } 325 switch (proc.getPeerOperationType()) { 326 case ADD: 327 case REMOVE: 328 case ENABLE: 329 case DISABLE: 330 case UPDATE_CONFIG: 331 waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId()); 332 break; 333 case REFRESH: 334 waiting = false; 335 break; 336 default: 337 throw new UnsupportedOperationException(); 338 } 339 } 340 return proc; 341 } 342 343 public void release(TestPeerProcedure proc) { 344 switch (proc.getPeerOperationType()) { 345 case ADD: 346 case REMOVE: 347 case ENABLE: 348 case DISABLE: 349 case UPDATE_CONFIG: 350 queue.wakePeerExclusiveLock(proc, proc.getPeerId()); 351 break; 352 case REFRESH: 353 break; 354 default: 355 throw new UnsupportedOperationException(); 356 } 357 } 358 } 359}