001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 030import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; 031import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; 032import org.apache.hadoop.hbase.procedure2.Procedure; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@Category({ MasterTests.class, MediumTests.class }) 044public class TestMasterProcedureSchedulerConcurrency { 045 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestMasterProcedureSchedulerConcurrency.class); 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class); 052 053 private MasterProcedureScheduler queue; 054 055 @Before 056 public void setUp() throws IOException { 057 queue = new MasterProcedureScheduler(pid -> null); 058 queue.start(); 059 } 060 061 @After 062 public void tearDown() throws IOException { 063 assertEquals("proc-queue expected to be empty", 0, queue.size()); 064 queue.stop(); 065 queue.clear(); 066 } 067 068 @Test 069 public void testConcurrentPeerOperations() throws Exception { 070 TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue); 071 072 int NUM_ITEMS = 10; 073 int NUM_PEERS = 5; 074 AtomicInteger opsCount = new AtomicInteger(0); 075 for (int i = 0; i < NUM_PEERS; ++i) { 076 String peerId = String.format("test-peer-%04d", i); 077 for (int j = 1; j < NUM_ITEMS; ++j) { 078 procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD)); 079 opsCount.incrementAndGet(); 080 } 081 } 082 assertEquals(opsCount.get(), queue.size()); 083 084 Thread[] threads = new Thread[NUM_PEERS * 2]; 085 HashSet<String> concurrentPeers = new HashSet<>(); 086 ArrayList<String> failures = new ArrayList<>(); 087 AtomicInteger concurrentCount = new AtomicInteger(0); 088 for (int i = 0; i < threads.length; ++i) { 089 threads[i] = new Thread() { 090 @Override 091 public void run() { 092 while (opsCount.get() > 0) { 093 try { 094 TestPeerProcedure proc = procSet.acquire(); 095 if (proc == null) { 096 queue.signalAll(); 097 if (opsCount.get() > 0) { 098 continue; 099 } 100 break; 101 } 102 103 String peerId = proc.getPeerId(); 104 synchronized (concurrentPeers) { 105 assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId)); 106 } 107 assertTrue(opsCount.decrementAndGet() >= 0); 108 109 try { 110 long procId = proc.getProcId(); 111 int concurrent = concurrentCount.incrementAndGet(); 112 assertTrue("inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_PEERS, 113 concurrent >= 1 && concurrent <= NUM_PEERS); 114 LOG.debug( 115 "[S] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent); 116 Thread.sleep(2000); 117 concurrent = concurrentCount.decrementAndGet(); 118 LOG.debug( 119 "[E] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent); 120 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS); 121 } finally { 122 synchronized (concurrentPeers) { 123 assertTrue(concurrentPeers.remove(peerId)); 124 } 125 procSet.release(proc); 126 } 127 } catch (Throwable e) { 128 LOG.error("Failed " + e.getMessage(), e); 129 synchronized (failures) { 130 failures.add(e.getMessage()); 131 } 132 } finally { 133 queue.signalAll(); 134 } 135 } 136 } 137 }; 138 threads[i].start(); 139 } 140 141 for (int i = 0; i < threads.length; ++i) { 142 threads[i].join(); 143 } 144 assertTrue(failures.toString(), failures.isEmpty()); 145 assertEquals(0, opsCount.get()); 146 assertEquals(0, queue.size()); 147 } 148 149 /** 150 * Verify that "write" operations for a single table are serialized, but different tables can be 151 * executed in parallel. 152 */ 153 @Test 154 public void testConcurrentWriteOps() throws Exception { 155 final TestTableProcSet procSet = new TestTableProcSet(queue); 156 157 final int NUM_ITEMS = 10; 158 final int NUM_TABLES = 4; 159 final AtomicInteger opsCount = new AtomicInteger(0); 160 for (int i = 0; i < NUM_TABLES; ++i) { 161 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); 162 for (int j = 1; j < NUM_ITEMS; ++j) { 163 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, 164 TableProcedureInterface.TableOperationType.EDIT)); 165 opsCount.incrementAndGet(); 166 } 167 } 168 assertEquals(opsCount.get(), queue.size()); 169 170 final Thread[] threads = new Thread[NUM_TABLES * 2]; 171 final HashSet<TableName> concurrentTables = new HashSet<>(); 172 final ArrayList<String> failures = new ArrayList<>(); 173 final AtomicInteger concurrentCount = new AtomicInteger(0); 174 for (int i = 0; i < threads.length; ++i) { 175 threads[i] = new Thread() { 176 @Override 177 public void run() { 178 while (opsCount.get() > 0) { 179 try { 180 Procedure proc = procSet.acquire(); 181 if (proc == null) { 182 queue.signalAll(); 183 if (opsCount.get() > 0) { 184 continue; 185 } 186 break; 187 } 188 189 TableName tableId = procSet.getTableName(proc); 190 synchronized (concurrentTables) { 191 assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId)); 192 } 193 assertTrue(opsCount.decrementAndGet() >= 0); 194 try { 195 long procId = proc.getProcId(); 196 int concurrent = concurrentCount.incrementAndGet(); 197 assertTrue("inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_TABLES, 198 concurrent >= 1 && concurrent <= NUM_TABLES); 199 LOG.debug( 200 "[S] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent); 201 Thread.sleep(2000); 202 concurrent = concurrentCount.decrementAndGet(); 203 LOG.debug( 204 "[E] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent); 205 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); 206 } finally { 207 synchronized (concurrentTables) { 208 assertTrue(concurrentTables.remove(tableId)); 209 } 210 procSet.release(proc); 211 } 212 } catch (Throwable e) { 213 LOG.error("Failed " + e.getMessage(), e); 214 synchronized (failures) { 215 failures.add(e.getMessage()); 216 } 217 } finally { 218 queue.signalAll(); 219 } 220 } 221 } 222 }; 223 threads[i].start(); 224 } 225 for (int i = 0; i < threads.length; ++i) { 226 threads[i].join(); 227 } 228 assertTrue(failures.toString(), failures.isEmpty()); 229 assertEquals(0, opsCount.get()); 230 assertEquals(0, queue.size()); 231 232 for (int i = 1; i <= NUM_TABLES; ++i) { 233 final TableName table = TableName.valueOf(String.format("testtb-%04d", i)); 234 final TestTableProcedure dummyProc = 235 new TestTableProcedure(100, table, TableProcedureInterface.TableOperationType.DELETE); 236 assertTrue("queue should be deleted, table=" + table, 237 queue.markTableAsDeleted(table, dummyProc)); 238 } 239 } 240 241 @Test 242 public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception { 243 // Make sure the tool does not get stuck 244 MasterProcedureSchedulerPerformanceEvaluation.main(new String[] { "-num_ops", "1000" }); 245 } 246 247 public static class TestTableProcSet { 248 private final MasterProcedureScheduler queue; 249 250 public TestTableProcSet(final MasterProcedureScheduler queue) { 251 this.queue = queue; 252 } 253 254 public void addBack(Procedure proc) { 255 queue.addBack(proc); 256 } 257 258 public void addFront(Procedure proc) { 259 queue.addFront(proc); 260 } 261 262 public Procedure acquire() { 263 Procedure proc = null; 264 boolean waiting = true; 265 while (waiting && queue.size() > 0) { 266 proc = queue.poll(100000000L); 267 if (proc == null) continue; 268 switch (getTableOperationType(proc)) { 269 case CREATE: 270 case DELETE: 271 case EDIT: 272 waiting = queue.waitTableExclusiveLock(proc, getTableName(proc)); 273 break; 274 case READ: 275 waiting = queue.waitTableSharedLock(proc, getTableName(proc)); 276 break; 277 default: 278 throw new UnsupportedOperationException(); 279 } 280 } 281 return proc; 282 } 283 284 public void release(Procedure proc) { 285 switch (getTableOperationType(proc)) { 286 case CREATE: 287 case DELETE: 288 case EDIT: 289 queue.wakeTableExclusiveLock(proc, getTableName(proc)); 290 break; 291 case READ: 292 queue.wakeTableSharedLock(proc, getTableName(proc)); 293 break; 294 default: 295 throw new UnsupportedOperationException(); 296 } 297 } 298 299 public TableName getTableName(Procedure proc) { 300 return ((TableProcedureInterface) proc).getTableName(); 301 } 302 303 public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { 304 return ((TableProcedureInterface) proc).getTableOperationType(); 305 } 306 } 307 308 public static class TestPeerProcedureSet { 309 private final MasterProcedureScheduler queue; 310 311 public TestPeerProcedureSet(final MasterProcedureScheduler queue) { 312 this.queue = queue; 313 } 314 315 public void addBack(TestPeerProcedure proc) { 316 queue.addBack(proc); 317 } 318 319 public TestPeerProcedure acquire() { 320 TestPeerProcedure proc = null; 321 boolean waiting = true; 322 while (waiting && queue.size() > 0) { 323 proc = (TestPeerProcedure) queue.poll(100000000L); 324 if (proc == null) { 325 continue; 326 } 327 switch (proc.getPeerOperationType()) { 328 case ADD: 329 case REMOVE: 330 case ENABLE: 331 case DISABLE: 332 case UPDATE_CONFIG: 333 waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId()); 334 break; 335 case REFRESH: 336 waiting = false; 337 break; 338 default: 339 throw new UnsupportedOperationException(); 340 } 341 } 342 return proc; 343 } 344 345 public void release(TestPeerProcedure proc) { 346 switch (proc.getPeerOperationType()) { 347 case ADD: 348 case REMOVE: 349 case ENABLE: 350 case DISABLE: 351 case UPDATE_CONFIG: 352 queue.wakePeerExclusiveLock(proc, proc.getPeerId()); 353 break; 354 case REFRESH: 355 break; 356 default: 357 throw new UnsupportedOperationException(); 358 } 359 } 360 } 361}