001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
029import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure;
030import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.junit.jupiter.api.AfterEach;
035import org.junit.jupiter.api.BeforeEach;
036import org.junit.jupiter.api.Tag;
037import org.junit.jupiter.api.Test;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Tag(MasterTests.TAG)
042@Tag(MediumTests.TAG)
043public class TestMasterProcedureSchedulerConcurrency {
044
045  private static final Logger LOG =
046    LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class);
047
048  private MasterProcedureScheduler queue;
049
050  @BeforeEach
051  public void setUp() throws IOException {
052    queue = new MasterProcedureScheduler(pid -> null);
053    queue.start();
054  }
055
056  @AfterEach
057  public void tearDown() throws IOException {
058    assertEquals(0, queue.size(), "proc-queue expected to be empty");
059    queue.stop();
060    queue.clear();
061  }
062
063  @Test
064  public void testConcurrentPeerOperations() throws Exception {
065    TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue);
066
067    int NUM_ITEMS = 10;
068    int NUM_PEERS = 5;
069    AtomicInteger opsCount = new AtomicInteger(0);
070    for (int i = 0; i < NUM_PEERS; ++i) {
071      String peerId = String.format("test-peer-%04d", i);
072      for (int j = 1; j < NUM_ITEMS; ++j) {
073        procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD));
074        opsCount.incrementAndGet();
075      }
076    }
077    assertEquals(opsCount.get(), queue.size());
078
079    Thread[] threads = new Thread[NUM_PEERS * 2];
080    HashSet<String> concurrentPeers = new HashSet<>();
081    ArrayList<String> failures = new ArrayList<>();
082    AtomicInteger concurrentCount = new AtomicInteger(0);
083    for (int i = 0; i < threads.length; ++i) {
084      threads[i] = new Thread() {
085        @Override
086        public void run() {
087          while (opsCount.get() > 0) {
088            try {
089              TestPeerProcedure proc = procSet.acquire();
090              if (proc == null) {
091                queue.signalAll();
092                if (opsCount.get() > 0) {
093                  continue;
094                }
095                break;
096              }
097
098              String peerId = proc.getPeerId();
099              synchronized (concurrentPeers) {
100                assertTrue(concurrentPeers.add(peerId), "unexpected concurrency on " + peerId);
101              }
102              assertTrue(opsCount.decrementAndGet() >= 0);
103
104              try {
105                long procId = proc.getProcId();
106                int concurrent = concurrentCount.incrementAndGet();
107                assertTrue(concurrent >= 1 && concurrent <= NUM_PEERS,
108                  "inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_PEERS);
109                LOG.debug(
110                  "[S] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent);
111                Thread.sleep(2000);
112                concurrent = concurrentCount.decrementAndGet();
113                LOG.debug(
114                  "[E] peerId=" + peerId + " procId=" + procId + " concurrent=" + concurrent);
115                assertTrue(concurrent < NUM_PEERS, "dec-concurrent=" + concurrent);
116              } finally {
117                synchronized (concurrentPeers) {
118                  assertTrue(concurrentPeers.remove(peerId));
119                }
120                procSet.release(proc);
121              }
122            } catch (Throwable e) {
123              LOG.error("Failed " + e.getMessage(), e);
124              synchronized (failures) {
125                failures.add(e.getMessage());
126              }
127            } finally {
128              queue.signalAll();
129            }
130          }
131        }
132      };
133      threads[i].start();
134    }
135
136    for (int i = 0; i < threads.length; ++i) {
137      threads[i].join();
138    }
139    assertTrue(failures.isEmpty(), failures.toString());
140    assertEquals(0, opsCount.get());
141    assertEquals(0, queue.size());
142  }
143
144  /**
145   * Verify that "write" operations for a single table are serialized, but different tables can be
146   * executed in parallel.
147   */
148  @Test
149  public void testConcurrentWriteOps() throws Exception {
150    final TestTableProcSet procSet = new TestTableProcSet(queue);
151
152    final int NUM_ITEMS = 10;
153    final int NUM_TABLES = 4;
154    final AtomicInteger opsCount = new AtomicInteger(0);
155    for (int i = 0; i < NUM_TABLES; ++i) {
156      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
157      for (int j = 1; j < NUM_ITEMS; ++j) {
158        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
159          TableProcedureInterface.TableOperationType.EDIT));
160        opsCount.incrementAndGet();
161      }
162    }
163    assertEquals(opsCount.get(), queue.size());
164
165    final Thread[] threads = new Thread[NUM_TABLES * 2];
166    final HashSet<TableName> concurrentTables = new HashSet<>();
167    final ArrayList<String> failures = new ArrayList<>();
168    final AtomicInteger concurrentCount = new AtomicInteger(0);
169    for (int i = 0; i < threads.length; ++i) {
170      threads[i] = new Thread() {
171        @Override
172        public void run() {
173          while (opsCount.get() > 0) {
174            try {
175              Procedure proc = procSet.acquire();
176              if (proc == null) {
177                queue.signalAll();
178                if (opsCount.get() > 0) {
179                  continue;
180                }
181                break;
182              }
183
184              TableName tableId = procSet.getTableName(proc);
185              synchronized (concurrentTables) {
186                assertTrue(concurrentTables.add(tableId), "unexpected concurrency on " + tableId);
187              }
188              assertTrue(opsCount.decrementAndGet() >= 0);
189              try {
190                long procId = proc.getProcId();
191                int concurrent = concurrentCount.incrementAndGet();
192                assertTrue(concurrent >= 1 && concurrent <= NUM_TABLES,
193                  "inc-concurrent=" + concurrent + " 1 <= concurrent <= " + NUM_TABLES);
194                LOG.debug(
195                  "[S] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent);
196                Thread.sleep(2000);
197                concurrent = concurrentCount.decrementAndGet();
198                LOG.debug(
199                  "[E] tableId=" + tableId + " procId=" + procId + " concurrent=" + concurrent);
200                assertTrue(concurrent < NUM_TABLES, "dec-concurrent=" + concurrent);
201              } finally {
202                synchronized (concurrentTables) {
203                  assertTrue(concurrentTables.remove(tableId));
204                }
205                procSet.release(proc);
206              }
207            } catch (Throwable e) {
208              LOG.error("Failed " + e.getMessage(), e);
209              synchronized (failures) {
210                failures.add(e.getMessage());
211              }
212            } finally {
213              queue.signalAll();
214            }
215          }
216        }
217      };
218      threads[i].start();
219    }
220    for (int i = 0; i < threads.length; ++i) {
221      threads[i].join();
222    }
223    assertTrue(failures.isEmpty(), failures.toString());
224    assertEquals(0, opsCount.get());
225    assertEquals(0, queue.size());
226
227    for (int i = 1; i <= NUM_TABLES; ++i) {
228      final TableName table = TableName.valueOf(String.format("testtb-%04d", i));
229      final TestTableProcedure dummyProc =
230        new TestTableProcedure(100, table, TableProcedureInterface.TableOperationType.DELETE);
231      assertTrue(queue.markTableAsDeleted(table, dummyProc),
232        "queue should be deleted, table=" + table);
233    }
234  }
235
236  @Test
237  public void testMasterProcedureSchedulerPerformanceEvaluation() throws Exception {
238    // Make sure the tool does not get stuck
239    MasterProcedureSchedulerPerformanceEvaluation.main(new String[] { "-num_ops", "1000" });
240  }
241
242  public static class TestTableProcSet {
243    private final MasterProcedureScheduler queue;
244
245    public TestTableProcSet(final MasterProcedureScheduler queue) {
246      this.queue = queue;
247    }
248
249    public void addBack(Procedure proc) {
250      queue.addBack(proc);
251    }
252
253    public void addFront(Procedure proc) {
254      queue.addFront(proc);
255    }
256
257    public Procedure acquire() {
258      Procedure proc = null;
259      boolean waiting = true;
260      while (waiting && queue.size() > 0) {
261        proc = queue.poll(100000000L);
262        if (proc == null) {
263          continue;
264        }
265        switch (getTableOperationType(proc)) {
266          case CREATE:
267          case DELETE:
268          case EDIT:
269            waiting = queue.waitTableExclusiveLock(proc, getTableName(proc));
270            break;
271          case READ:
272            waiting = queue.waitTableSharedLock(proc, getTableName(proc));
273            break;
274          default:
275            throw new UnsupportedOperationException();
276        }
277      }
278      return proc;
279    }
280
281    public void release(Procedure proc) {
282      switch (getTableOperationType(proc)) {
283        case CREATE:
284        case DELETE:
285        case EDIT:
286          queue.wakeTableExclusiveLock(proc, getTableName(proc));
287          break;
288        case READ:
289          queue.wakeTableSharedLock(proc, getTableName(proc));
290          break;
291        default:
292          throw new UnsupportedOperationException();
293      }
294      queue.completionCleanup(proc);
295    }
296
297    public TableName getTableName(Procedure proc) {
298      return ((TableProcedureInterface) proc).getTableName();
299    }
300
301    public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
302      return ((TableProcedureInterface) proc).getTableOperationType();
303    }
304  }
305
306  public static class TestPeerProcedureSet {
307    private final MasterProcedureScheduler queue;
308
309    public TestPeerProcedureSet(final MasterProcedureScheduler queue) {
310      this.queue = queue;
311    }
312
313    public void addBack(TestPeerProcedure proc) {
314      queue.addBack(proc);
315    }
316
317    public TestPeerProcedure acquire() {
318      TestPeerProcedure proc = null;
319      boolean waiting = true;
320      while (waiting && queue.size() > 0) {
321        proc = (TestPeerProcedure) queue.poll(100000000L);
322        if (proc == null) {
323          continue;
324        }
325        switch (proc.getPeerOperationType()) {
326          case ADD:
327          case REMOVE:
328          case ENABLE:
329          case DISABLE:
330          case UPDATE_CONFIG:
331            waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId());
332            break;
333          case REFRESH:
334            waiting = false;
335            break;
336          default:
337            throw new UnsupportedOperationException();
338        }
339      }
340      return proc;
341    }
342
343    public void release(TestPeerProcedure proc) {
344      switch (proc.getPeerOperationType()) {
345        case ADD:
346        case REMOVE:
347        case ENABLE:
348        case DISABLE:
349        case UPDATE_CONFIG:
350          queue.wakePeerExclusiveLock(proc, proc.getPeerId());
351          break;
352        case REFRESH:
353          break;
354        default:
355          throw new UnsupportedOperationException();
356      }
357    }
358  }
359}