001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 030import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; 031import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; 032import org.apache.hadoop.hbase.procedure2.Procedure; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@Category({ MasterTests.class, MediumTests.class }) 044public class TestMasterProcedureSchedulerConcurrency { 045 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestMasterProcedureSchedulerConcurrency.class); 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class); 052 053 private MasterProcedureScheduler queue; 054 055 @Before 056 public void setUp() throws IOException { 057 queue = new MasterProcedureScheduler(pid -> null); 058 queue.start(); 059 } 060 061 @After 062 public void tearDown() throws IOException { 063 assertEquals("proc-queue expected to be empty", 0, queue.size()); 064 queue.stop(); 065 queue.clear(); 066 } 067 068 @Test 069 public void testConcurrentPeerOperations() throws Exception { 070 TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue); 071 072 int NUM_ITEMS = 10; 073 int NUM_PEERS = 5; 074 AtomicInteger opsCount = new AtomicInteger(0); 075 for (int i = 0; i < NUM_PEERS; ++i) { 076 String peerId = String.format("test-peer-%04d", i); 077 for (int j = 1; j < NUM_ITEMS; ++j) { 078 procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD)); 079 opsCount.incrementAndGet(); 080 } 081 } 082 assertEquals(opsCount.get(), queue.size()); 083 084 Thread[] threads = new Thread[NUM_PEERS * 2]; 085 HashSet<String> concurrentPeers = new HashSet<>(); 086 ArrayList<String> failures = new ArrayList<>(); 087 AtomicInteger concurrentCount = new AtomicInteger(0); 088 for (int i = 0; i < threads.length; ++i) { 089 threads[i] = new Thread() { 090 @Override 091 public void run() { 092 while (opsCount.get() > 0) { 093 try { 094 TestPeerProcedure proc = procSet.acquire(); 095 if (proc == null) { 096 queue.signalAll(); 097 if (opsCount.get() > 0) { 098 continue; 099 } 100 break; 101 } 102 103 String peerId = proc.getPeerId(); 104 synchronized (concurrentPeers) { 105 assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId)); 106 } 107 assertTrue(opsCount.decrementAndGet() >= 0); 108 109 try { 110 long procId = proc.getProcId(); 111 int concurrent = concurrentCount.incrementAndGet(); 112 assertTrue("inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_PEERS, 113 concurrent >= 1 && concurrent <= NUM_PEERS); 114 LOG.debug( 115 "[S] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent); 116 Thread.sleep(2000); 117 concurrent = concurrentCount.decrementAndGet(); 118 LOG.debug( 119 "[E] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent); 120 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS); 121 } finally { 122 synchronized (concurrentPeers) { 123 assertTrue(concurrentPeers.remove(peerId)); 124 } 125 procSet.release(proc); 126 } 127 } catch (Throwable e) { 128 LOG.error("Failed " + e.getMessage(), e); 129 synchronized (failures) { 130 failures.add(e.getMessage()); 131 } 132 } finally { 133 queue.signalAll(); 134 } 135 } 136 } 137 }; 138 threads[i].start(); 139 } 140 141 for (int i = 0; i < threads.length; ++i) { 142 threads[i].join(); 143 } 144 assertTrue(failures.toString(), failures.isEmpty()); 145 assertEquals(0, opsCount.get()); 146 assertEquals(0, queue.size()); 147 } 148 149 /** 150 * Verify that "write" operations for a single table are serialized, but different tables can be 151 * executed in parallel. 152 */ 153 @Test 154 public void testConcurrentWriteOps() throws Exception { 155 final TestTableProcSet procSet = new TestTableProcSet(queue); 156 157 final int NUM_ITEMS = 10; 158 final int NUM_TABLES = 4; 159 final AtomicInteger opsCount = new AtomicInteger(0); 160 for (int i = 0; i < NUM_TABLES; ++i) { 161 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); 162 for (int j = 1; j < NUM_ITEMS; ++j) { 163 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, 164 TableProcedureInterface.TableOperationType.EDIT)); 165 opsCount.incrementAndGet(); 166 } 167 } 168 assertEquals(opsCount.get(), queue.size()); 169 170 final Thread[] threads = new Thread[NUM_TABLES * 2]; 171 final HashSet<TableName> concurrentTables = new HashSet<>(); 172 final ArrayList<String> failures = new ArrayList<>(); 173 final AtomicInteger concurrentCount = new AtomicInteger(0); 174 for (int i = 0; i < threads.length; ++i) { 175 threads[i] = new Thread() { 176 @Override 177 public void run() { 178 while (opsCount.get() > 0) { 179 try { 180 Procedure proc = procSet.acquire(); 181 if (proc == null) { 182 queue.signalAll(); 183 if (opsCount.get() > 0) { 184 continue; 185 } 186 break; 187 } 188 189 TableName tableId = procSet.getTableName(proc); 190 synchronized (concurrentTables) { 191 assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId)); 192 } 193 assertTrue(opsCount.decrementAndGet() >= 0); 194 try { 195 long procId = proc.getProcId(); 196 int concurrent = concurrentCount.incrementAndGet(); 197 assertTrue("inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_TABLES, 198 concurrent >= 1 && concurrent <= NUM_TABLES); 199 LOG.debug( 200 "[S] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent); 201 Thread.sleep(2000); 202 concurrent = concurrentCount.decrementAndGet(); 203 LOG.debug( 204 "[E] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent); 205 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); 206 } finally { 207 synchronized (concurrentTables) { 208 assertTrue(concurrentTables.remove(tableId)); 209 } 210 procSet.release(proc); 211 } 212 } catch (Throwable e) { 213 LOG.error("Failed " + e.getMessage(), e); 214 synchronized (failures) { 215 failures.add(e.getMessage()); 216 } 217 } finally { 218 queue.signalAll(); 219 } 220 } 221 } 222 }; 223 threads[i].start(); 224 } 225 for (int i = 0; i < threads.length; ++i) { 226 threads[i].join(); 227 } 228 assertTrue(failures.toString(), failures.isEmpty()); 229 assertEquals(0, opsCount.get()); 230 assertEquals(0, queue.size()); 231 232 for (int i = 1; i <= NUM_TABLES; ++i) { 233 final TableName table = TableName.valueOf(String.format("testtb-%04d", i)); 234 final TestTableProcedure dummyProc = 235 new TestTableProcedure(100, table, TableProcedureInterface.TableOperationType.DELETE); 236 assertTrue("queue should be deleted, table=" + table, 237 queue.markTableAsDeleted(table, dummyProc)); 238 } 239 } 240 241 @Test 242 public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception { 243 // Make sure the tool does not get stuck 244 MasterProcedureSchedulerPerformanceEvaluation.main(new String[] { "-num_ops", "1000" }); 245 } 246 247 public static class TestTableProcSet { 248 private final MasterProcedureScheduler queue; 249 250 public TestTableProcSet(final MasterProcedureScheduler queue) { 251 this.queue = queue; 252 } 253 254 public void addBack(Procedure proc) { 255 queue.addBack(proc); 256 } 257 258 public void addFront(Procedure proc) { 259 queue.addFront(proc); 260 } 261 262 public Procedure acquire() { 263 Procedure proc = null; 264 boolean waiting = true; 265 while (waiting && queue.size() > 0) { 266 proc = queue.poll(100000000L); 267 if (proc == null) { 268 continue; 269 } 270 switch (getTableOperationType(proc)) { 271 case CREATE: 272 case DELETE: 273 case EDIT: 274 waiting = queue.waitTableExclusiveLock(proc, getTableName(proc)); 275 break; 276 case READ: 277 waiting = queue.waitTableSharedLock(proc, getTableName(proc)); 278 break; 279 default: 280 throw new UnsupportedOperationException(); 281 } 282 } 283 return proc; 284 } 285 286 public void release(Procedure proc) { 287 switch (getTableOperationType(proc)) { 288 case CREATE: 289 case DELETE: 290 case EDIT: 291 queue.wakeTableExclusiveLock(proc, getTableName(proc)); 292 break; 293 case READ: 294 queue.wakeTableSharedLock(proc, getTableName(proc)); 295 break; 296 default: 297 throw new UnsupportedOperationException(); 298 } 299 queue.completionCleanup(proc); 300 } 301 302 public TableName getTableName(Procedure proc) { 303 return ((TableProcedureInterface) proc).getTableName(); 304 } 305 306 public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { 307 return ((TableProcedureInterface) proc).getTableOperationType(); 308 } 309 } 310 311 public static class TestPeerProcedureSet { 312 private final MasterProcedureScheduler queue; 313 314 public TestPeerProcedureSet(final MasterProcedureScheduler queue) { 315 this.queue = queue; 316 } 317 318 public void addBack(TestPeerProcedure proc) { 319 queue.addBack(proc); 320 } 321 322 public TestPeerProcedure acquire() { 323 TestPeerProcedure proc = null; 324 boolean waiting = true; 325 while (waiting && queue.size() > 0) { 326 proc = (TestPeerProcedure) queue.poll(100000000L); 327 if (proc == null) { 328 continue; 329 } 330 switch (proc.getPeerOperationType()) { 331 case ADD: 332 case REMOVE: 333 case ENABLE: 334 case DISABLE: 335 case UPDATE_CONFIG: 336 waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId()); 337 break; 338 case REFRESH: 339 waiting = false; 340 break; 341 default: 342 throw new UnsupportedOperationException(); 343 } 344 } 345 return proc; 346 } 347 348 public void release(TestPeerProcedure proc) { 349 switch (proc.getPeerOperationType()) { 350 case ADD: 351 case REMOVE: 352 case ENABLE: 353 case DISABLE: 354 case UPDATE_CONFIG: 355 queue.wakePeerExclusiveLock(proc, proc.getPeerId()); 356 break; 357 case REFRESH: 358 break; 359 default: 360 throw new UnsupportedOperationException(); 361 } 362 } 363 } 364}