001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 030import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; 031import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; 032import org.apache.hadoop.hbase.procedure2.Procedure; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@Category({MasterTests.class, MediumTests.class}) 044public class TestMasterProcedureSchedulerConcurrency { 045 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestMasterProcedureSchedulerConcurrency.class); 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class); 052 053 private MasterProcedureScheduler queue; 054 055 @Before 056 public void setUp() throws IOException { 057 queue = new MasterProcedureScheduler(pid -> null); 058 queue.start(); 059 } 060 061 @After 062 public void tearDown() throws IOException { 063 assertEquals("proc-queue expected to be empty", 0, queue.size()); 064 queue.stop(); 065 queue.clear(); 066 } 067 068 @Test 069 public void testConcurrentPeerOperations() throws Exception { 070 TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue); 071 072 int NUM_ITEMS = 10; 073 int NUM_PEERS = 5; 074 AtomicInteger opsCount = new AtomicInteger(0); 075 for (int i = 0; i < NUM_PEERS; ++i) { 076 String peerId = String.format("test-peer-%04d", i); 077 for (int j = 1; j < NUM_ITEMS; ++j) { 078 procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD)); 079 opsCount.incrementAndGet(); 080 } 081 } 082 assertEquals(opsCount.get(), queue.size()); 083 084 Thread[] threads = new Thread[NUM_PEERS * 2]; 085 HashSet<String> concurrentPeers = new HashSet<>(); 086 ArrayList<String> failures = new ArrayList<>(); 087 AtomicInteger concurrentCount = new AtomicInteger(0); 088 for (int i = 0; i < threads.length; ++i) { 089 threads[i] = new Thread() { 090 @Override 091 public void run() { 092 while (opsCount.get() > 0) { 093 try { 094 TestPeerProcedure proc = procSet.acquire(); 095 if (proc == null) { 096 queue.signalAll(); 097 if (opsCount.get() > 0) { 098 continue; 099 } 100 break; 101 } 102 103 String peerId = proc.getPeerId(); 104 synchronized (concurrentPeers) { 105 assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId)); 106 } 107 assertTrue(opsCount.decrementAndGet() >= 0); 108 109 try { 110 long procId = proc.getProcId(); 111 int concurrent = concurrentCount.incrementAndGet(); 112 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_PEERS, 113 concurrent >= 1 && concurrent <= NUM_PEERS); 114 LOG.debug("[S] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent); 115 Thread.sleep(2000); 116 concurrent = concurrentCount.decrementAndGet(); 117 LOG.debug("[E] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent); 118 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS); 119 } finally { 120 synchronized (concurrentPeers) { 121 assertTrue(concurrentPeers.remove(peerId)); 122 } 123 procSet.release(proc); 124 } 125 } catch (Throwable e) { 126 LOG.error("Failed " + e.getMessage(), e); 127 synchronized (failures) { 128 failures.add(e.getMessage()); 129 } 130 } finally { 131 queue.signalAll(); 132 } 133 } 134 } 135 }; 136 threads[i].start(); 137 } 138 139 for (int i = 0; i < threads.length; ++i) { 140 threads[i].join(); 141 } 142 assertTrue(failures.toString(), failures.isEmpty()); 143 assertEquals(0, opsCount.get()); 144 assertEquals(0, queue.size()); 145 } 146 147 /** 148 * Verify that "write" operations for a single table are serialized, 149 * but different tables can be executed in parallel. 150 */ 151 @Test 152 public void testConcurrentWriteOps() throws Exception { 153 final TestTableProcSet procSet = new TestTableProcSet(queue); 154 155 final int NUM_ITEMS = 10; 156 final int NUM_TABLES = 4; 157 final AtomicInteger opsCount = new AtomicInteger(0); 158 for (int i = 0; i < NUM_TABLES; ++i) { 159 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); 160 for (int j = 1; j < NUM_ITEMS; ++j) { 161 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, 162 TableProcedureInterface.TableOperationType.EDIT)); 163 opsCount.incrementAndGet(); 164 } 165 } 166 assertEquals(opsCount.get(), queue.size()); 167 168 final Thread[] threads = new Thread[NUM_TABLES * 2]; 169 final HashSet<TableName> concurrentTables = new HashSet<>(); 170 final ArrayList<String> failures = new ArrayList<>(); 171 final AtomicInteger concurrentCount = new AtomicInteger(0); 172 for (int i = 0; i < threads.length; ++i) { 173 threads[i] = new Thread() { 174 @Override 175 public void run() { 176 while (opsCount.get() > 0) { 177 try { 178 Procedure proc = procSet.acquire(); 179 if (proc == null) { 180 queue.signalAll(); 181 if (opsCount.get() > 0) { 182 continue; 183 } 184 break; 185 } 186 187 TableName tableId = procSet.getTableName(proc); 188 synchronized (concurrentTables) { 189 assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId)); 190 } 191 assertTrue(opsCount.decrementAndGet() >= 0); 192 try { 193 long procId = proc.getProcId(); 194 int concurrent = concurrentCount.incrementAndGet(); 195 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, 196 concurrent >= 1 && concurrent <= NUM_TABLES); 197 LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); 198 Thread.sleep(2000); 199 concurrent = concurrentCount.decrementAndGet(); 200 LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); 201 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); 202 } finally { 203 synchronized (concurrentTables) { 204 assertTrue(concurrentTables.remove(tableId)); 205 } 206 procSet.release(proc); 207 } 208 } catch (Throwable e) { 209 LOG.error("Failed " + e.getMessage(), e); 210 synchronized (failures) { 211 failures.add(e.getMessage()); 212 } 213 } finally { 214 queue.signalAll(); 215 } 216 } 217 } 218 }; 219 threads[i].start(); 220 } 221 for (int i = 0; i < threads.length; ++i) { 222 threads[i].join(); 223 } 224 assertTrue(failures.toString(), failures.isEmpty()); 225 assertEquals(0, opsCount.get()); 226 assertEquals(0, queue.size()); 227 228 for (int i = 1; i <= NUM_TABLES; ++i) { 229 final TableName table = TableName.valueOf(String.format("testtb-%04d", i)); 230 final TestTableProcedure dummyProc = new TestTableProcedure(100, table, 231 TableProcedureInterface.TableOperationType.DELETE); 232 assertTrue("queue should be deleted, table=" + table, 233 queue.markTableAsDeleted(table, dummyProc)); 234 } 235 } 236 237 @Test 238 public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception { 239 // Make sure the tool does not get stuck 240 MasterProcedureSchedulerPerformanceEvaluation.main(new String[] { 241 "-num_ops", "1000" 242 }); 243 } 244 245 public static class TestTableProcSet { 246 private final MasterProcedureScheduler queue; 247 248 public TestTableProcSet(final MasterProcedureScheduler queue) { 249 this.queue = queue; 250 } 251 252 public void addBack(Procedure proc) { 253 queue.addBack(proc); 254 } 255 256 public void addFront(Procedure proc) { 257 queue.addFront(proc); 258 } 259 260 public Procedure acquire() { 261 Procedure proc = null; 262 boolean waiting = true; 263 while (waiting && queue.size() > 0) { 264 proc = queue.poll(100000000L); 265 if (proc == null) continue; 266 switch (getTableOperationType(proc)) { 267 case CREATE: 268 case DELETE: 269 case EDIT: 270 waiting = queue.waitTableExclusiveLock(proc, getTableName(proc)); 271 break; 272 case READ: 273 waiting = queue.waitTableSharedLock(proc, getTableName(proc)); 274 break; 275 default: 276 throw new UnsupportedOperationException(); 277 } 278 } 279 return proc; 280 } 281 282 public void release(Procedure proc) { 283 switch (getTableOperationType(proc)) { 284 case CREATE: 285 case DELETE: 286 case EDIT: 287 queue.wakeTableExclusiveLock(proc, getTableName(proc)); 288 break; 289 case READ: 290 queue.wakeTableSharedLock(proc, getTableName(proc)); 291 break; 292 default: 293 throw new UnsupportedOperationException(); 294 } 295 } 296 297 public TableName getTableName(Procedure proc) { 298 return ((TableProcedureInterface)proc).getTableName(); 299 } 300 301 public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { 302 return ((TableProcedureInterface)proc).getTableOperationType(); 303 } 304 } 305 306 public static class TestPeerProcedureSet { 307 private final MasterProcedureScheduler queue; 308 309 public TestPeerProcedureSet(final MasterProcedureScheduler queue) { 310 this.queue = queue; 311 } 312 313 public void addBack(TestPeerProcedure proc) { 314 queue.addBack(proc); 315 } 316 317 public TestPeerProcedure acquire() { 318 TestPeerProcedure proc = null; 319 boolean waiting = true; 320 while (waiting && queue.size() > 0) { 321 proc = (TestPeerProcedure) queue.poll(100000000L); 322 if (proc == null) { 323 continue; 324 } 325 switch (proc.getPeerOperationType()) { 326 case ADD: 327 case REMOVE: 328 case ENABLE: 329 case DISABLE: 330 case UPDATE_CONFIG: 331 waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId()); 332 break; 333 case REFRESH: 334 waiting = false; 335 break; 336 default: 337 throw new UnsupportedOperationException(); 338 } 339 } 340 return proc; 341 } 342 343 public void release(TestPeerProcedure proc) { 344 switch (proc.getPeerOperationType()) { 345 case ADD: 346 case REMOVE: 347 case ENABLE: 348 case DISABLE: 349 case UPDATE_CONFIG: 350 queue.wakePeerExclusiveLock(proc, proc.getPeerId()); 351 break; 352 case REFRESH: 353 break; 354 default: 355 throw new UnsupportedOperationException(); 356 } 357 } 358 } 359}