001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; 030import org.apache.hadoop.hbase.procedure2.Procedure; 031import org.apache.hadoop.hbase.testclassification.MasterTests; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.junit.After; 034import org.junit.Before; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Category({MasterTests.class, MediumTests.class}) 042public class TestMasterProcedureSchedulerConcurrency { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestMasterProcedureSchedulerConcurrency.class); 047 048 private static final Logger LOG = 049 LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class); 050 051 private MasterProcedureScheduler queue; 052 053 @Before 054 public void setUp() throws IOException { 055 queue = new MasterProcedureScheduler(pid -> null); 056 queue.start(); 057 } 058 059 @After 060 public void tearDown() throws IOException { 061 assertEquals("proc-queue expected to be empty", 0, queue.size()); 062 queue.stop(); 063 queue.clear(); 064 } 065 066 /** 067 * Verify that "write" operations for a single table are serialized, 068 * but different tables can be executed in parallel. 069 */ 070 @Test 071 public void testConcurrentWriteOps() throws Exception { 072 final TestTableProcSet procSet = new TestTableProcSet(queue); 073 074 final int NUM_ITEMS = 10; 075 final int NUM_TABLES = 4; 076 final AtomicInteger opsCount = new AtomicInteger(0); 077 for (int i = 0; i < NUM_TABLES; ++i) { 078 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); 079 for (int j = 1; j < NUM_ITEMS; ++j) { 080 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, 081 TableProcedureInterface.TableOperationType.EDIT)); 082 opsCount.incrementAndGet(); 083 } 084 } 085 assertEquals(opsCount.get(), queue.size()); 086 087 final Thread[] threads = new Thread[NUM_TABLES * 2]; 088 final HashSet<TableName> concurrentTables = new HashSet<>(); 089 final ArrayList<String> failures = new ArrayList<>(); 090 final AtomicInteger concurrentCount = new AtomicInteger(0); 091 for (int i = 0; i < threads.length; ++i) { 092 threads[i] = new Thread() { 093 @Override 094 public void run() { 095 while (opsCount.get() > 0) { 096 try { 097 Procedure proc = procSet.acquire(); 098 if (proc == null) { 099 queue.signalAll(); 100 if (opsCount.get() > 0) { 101 continue; 102 } 103 break; 104 } 105 106 TableName tableId = procSet.getTableName(proc); 107 synchronized (concurrentTables) { 108 assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId)); 109 } 110 assertTrue(opsCount.decrementAndGet() >= 0); 111 try { 112 long procId = proc.getProcId(); 113 int concurrent = concurrentCount.incrementAndGet(); 114 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, 115 concurrent >= 1 && concurrent <= NUM_TABLES); 116 LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); 117 Thread.sleep(2000); 118 concurrent = concurrentCount.decrementAndGet(); 119 LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); 120 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); 121 } finally { 122 synchronized (concurrentTables) { 123 assertTrue(concurrentTables.remove(tableId)); 124 } 125 procSet.release(proc); 126 } 127 } catch (Throwable e) { 128 LOG.error("Failed " + e.getMessage(), e); 129 synchronized (failures) { 130 failures.add(e.getMessage()); 131 } 132 } finally { 133 queue.signalAll(); 134 } 135 } 136 } 137 }; 138 threads[i].start(); 139 } 140 for (int i = 0; i < threads.length; ++i) { 141 threads[i].join(); 142 } 143 assertTrue(failures.toString(), failures.isEmpty()); 144 assertEquals(0, opsCount.get()); 145 assertEquals(0, queue.size()); 146 147 for (int i = 1; i <= NUM_TABLES; ++i) { 148 final TableName table = TableName.valueOf(String.format("testtb-%04d", i)); 149 final TestTableProcedure dummyProc = new TestTableProcedure(100, table, 150 TableProcedureInterface.TableOperationType.DELETE); 151 assertTrue("queue should be deleted, table=" + table, 152 queue.markTableAsDeleted(table, dummyProc)); 153 } 154 } 155 156 @Test 157 public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception { 158 // Make sure the tool does not get stuck 159 MasterProcedureSchedulerPerformanceEvaluation.main(new String[] { 160 "-num_ops", "1000" 161 }); 162 } 163 164 public static class TestTableProcSet { 165 private final MasterProcedureScheduler queue; 166 167 public TestTableProcSet(final MasterProcedureScheduler queue) { 168 this.queue = queue; 169 } 170 171 public void addBack(Procedure proc) { 172 queue.addBack(proc); 173 } 174 175 public void addFront(Procedure proc) { 176 queue.addFront(proc); 177 } 178 179 public Procedure acquire() { 180 Procedure proc = null; 181 boolean waiting = true; 182 while (waiting && queue.size() > 0) { 183 proc = queue.poll(100000000L); 184 if (proc == null) continue; 185 switch (getTableOperationType(proc)) { 186 case CREATE: 187 case DELETE: 188 case EDIT: 189 waiting = queue.waitTableExclusiveLock(proc, getTableName(proc)); 190 break; 191 case READ: 192 waiting = queue.waitTableSharedLock(proc, getTableName(proc)); 193 break; 194 default: 195 throw new UnsupportedOperationException(); 196 } 197 } 198 return proc; 199 } 200 201 public void release(Procedure proc) { 202 switch (getTableOperationType(proc)) { 203 case CREATE: 204 case DELETE: 205 case EDIT: 206 queue.wakeTableExclusiveLock(proc, getTableName(proc)); 207 break; 208 case READ: 209 queue.wakeTableSharedLock(proc, getTableName(proc)); 210 break; 211 default: 212 throw new UnsupportedOperationException(); 213 } 214 } 215 216 public TableName getTableName(Procedure proc) { 217 return ((TableProcedureInterface)proc).getTableName(); 218 } 219 220 public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { 221 return ((TableProcedureInterface)proc).getTableOperationType(); 222 } 223 } 224}