001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.Method;
029import java.util.Arrays;
030import java.util.List;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionInfoBuilder;
036import org.apache.hadoop.hbase.master.locking.LockProcedure;
037import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
038import org.apache.hadoop.hbase.procedure2.LockType;
039import org.apache.hadoop.hbase.procedure2.LockedResource;
040import org.apache.hadoop.hbase.procedure2.LockedResourceType;
041import org.apache.hadoop.hbase.procedure2.Procedure;
042import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.Before;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Category({ MasterTests.class, SmallTests.class })
058public class TestMasterProcedureScheduler {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestMasterProcedureScheduler.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class);
065
066  private MasterProcedureScheduler queue;
067
068  @Rule
069  public TestName name = new TestName();
070
071  @Before
072  public void setUp() throws IOException {
073    queue = new MasterProcedureScheduler(pid -> null);
074    queue.start();
075  }
076
077  @After
078  public void tearDown() throws IOException {
079    assertEquals("proc-queue expected to be empty", 0, queue.size());
080    queue.stop();
081    queue.clear();
082  }
083
084  /**
085   * Verify simple create/insert/fetch/delete of the table queue.
086   */
087  @Test
088  public void testSimpleTableOpsQueues() throws Exception {
089    final int NUM_TABLES = 10;
090    final int NUM_ITEMS = 10;
091
092    int count = 0;
093    for (int i = 1; i <= NUM_TABLES; ++i) {
094      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
095      // insert items
096      for (int j = 1; j <= NUM_ITEMS; ++j) {
097        queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
098          TableProcedureInterface.TableOperationType.EDIT));
099        assertEquals(++count, queue.size());
100      }
101    }
102    assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
103
104    for (int j = 1; j <= NUM_ITEMS; ++j) {
105      for (int i = 1; i <= NUM_TABLES; ++i) {
106        Procedure<?> proc = queue.poll();
107        assertTrue(proc != null);
108        TableName tableName = ((TestTableProcedure) proc).getTableName();
109        queue.waitTableExclusiveLock(proc, tableName);
110        queue.wakeTableExclusiveLock(proc, tableName);
111        queue.completionCleanup(proc);
112        assertEquals(--count, queue.size());
113        assertEquals(i * 1000 + j, proc.getProcId());
114      }
115    }
116    assertEquals(0, queue.size());
117
118    for (int i = 1; i <= NUM_TABLES; ++i) {
119      final TableName tableName = TableName.valueOf(String.format("test-%04d", i));
120      final TestTableProcedure dummyProc =
121        new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
122      // complete the table deletion
123      assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
124    }
125  }
126
127  /**
128   * Check that the table queue is not deletable until every procedure in-progress is completed
129   * (this is a special case for write-locks).
130   */
131  @Test
132  public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
133    final TableName tableName = TableName.valueOf(name.getMethodName());
134
135    final TestTableProcedure dummyProc =
136      new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
137
138    queue.addBack(
139      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
140
141    // table can't be deleted because one item is in the queue
142    assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
143
144    // fetch item and take a lock
145    Procedure<?> proc = queue.poll();
146    assertEquals(1, proc.getProcId());
147    // take the xlock
148    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
149    // table can't be deleted because we have the lock
150    assertEquals(0, queue.size());
151    assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
152    // release the xlock
153    queue.wakeTableExclusiveLock(proc, tableName);
154    // complete the table deletion
155    assertTrue(queue.markTableAsDeleted(tableName, proc));
156  }
157
158  /**
159   * Check that the table queue is not deletable until every procedure in-progress is completed
160   * (this is a special case for read-locks).
161   */
162  @Test
163  public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
164    final TableName tableName = TableName.valueOf(name.getMethodName());
165    final int nitems = 2;
166
167    final TestTableProcedure dummyProc =
168      new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
169
170    for (int i = 1; i <= nitems; ++i) {
171      queue.addBack(
172        new TestTableProcedure(i, tableName, TableProcedureInterface.TableOperationType.READ));
173    }
174
175    // table can't be deleted because one item is in the queue
176    assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
177
178    Procedure<?>[] procs = new Procedure[nitems];
179    for (int i = 0; i < nitems; ++i) {
180      // fetch item and take a lock
181      Procedure<?> proc = queue.poll();
182      procs[i] = proc;
183      assertEquals(i + 1, proc.getProcId());
184      // take the rlock
185      assertEquals(false, queue.waitTableSharedLock(proc, tableName));
186      // table can't be deleted because we have locks and/or items in the queue
187      assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
188    }
189
190    for (int i = 0; i < nitems; ++i) {
191      // table can't be deleted because we have locks
192      assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
193      // release the rlock
194      queue.wakeTableSharedLock(procs[i], tableName);
195    }
196
197    // there are no items and no lock in the queeu
198    assertEquals(0, queue.size());
199    // complete the table deletion
200    assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
201  }
202
203  /**
204   * Verify the correct logic of RWLocks on the queue
205   */
206  @Test
207  public void testVerifyRwLocks() throws Exception {
208    final TableName tableName = TableName.valueOf(name.getMethodName());
209    queue.addBack(
210      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
211    queue.addBack(
212      new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
213    queue.addBack(
214      new TestTableProcedure(3, tableName, TableProcedureInterface.TableOperationType.EDIT));
215
216    // Fetch the 1st item and take the write lock
217    Procedure<?> proc = queue.poll();
218    assertEquals(1, proc.getProcId());
219    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
220
221    // Fetch the 2nd item and verify that the lock can't be acquired
222    assertEquals(null, queue.poll(0));
223
224    // Release the write lock and acquire the read lock
225    queue.wakeTableExclusiveLock(proc, tableName);
226
227    // Fetch the 2nd item and take the read lock
228    Procedure<?> rdProc = queue.poll();
229    assertEquals(2, rdProc.getProcId());
230    assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
231
232    // Fetch the 3rd item and verify that the lock can't be acquired
233    assertEquals(null, queue.poll(0));
234
235    // release the rdlock of item 2 and take the wrlock for the 3d item
236    queue.wakeTableSharedLock(rdProc, tableName);
237
238    queue.addBack(
239      new TestTableProcedure(4, tableName, TableProcedureInterface.TableOperationType.READ));
240    queue.addBack(
241      new TestTableProcedure(5, tableName, TableProcedureInterface.TableOperationType.READ));
242
243    // Fetch the 3rd item and take the write lock
244    Procedure<?> wrProc = queue.poll();
245    assertEquals(false, queue.waitTableExclusiveLock(wrProc, tableName));
246
247    // Fetch 4th item and verify that the lock can't be acquired
248    assertEquals(null, queue.poll(0));
249
250    // Release the write lock and acquire the read lock
251    queue.wakeTableExclusiveLock(wrProc, tableName);
252
253    // Fetch the 4th item and take the read lock
254    rdProc = queue.poll();
255    assertEquals(4, rdProc.getProcId());
256    assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
257
258    // Fetch the 4th item and take the read lock
259    Procedure<?> rdProc2 = queue.poll();
260    assertEquals(5, rdProc2.getProcId());
261    assertEquals(false, queue.waitTableSharedLock(rdProc2, tableName));
262
263    // Release 4th and 5th read-lock
264    queue.wakeTableSharedLock(rdProc, tableName);
265    queue.wakeTableSharedLock(rdProc2, tableName);
266
267    // remove table queue
268    assertEquals(0, queue.size());
269    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName, wrProc));
270  }
271
272  @Test
273  public void testVerifyNamespaceRwLocks() throws Exception {
274    String nsName1 = "ns1";
275    String nsName2 = "ns2";
276    TableName tableName1 = TableName.valueOf(nsName1, name.getMethodName());
277    TableName tableName2 = TableName.valueOf(nsName2, name.getMethodName());
278    queue.addBack(
279      new TestNamespaceProcedure(1, nsName1, TableProcedureInterface.TableOperationType.EDIT));
280    queue.addBack(
281      new TestTableProcedure(2, tableName1, TableProcedureInterface.TableOperationType.EDIT));
282    queue.addBack(
283      new TestTableProcedure(3, tableName2, TableProcedureInterface.TableOperationType.EDIT));
284    queue.addBack(
285      new TestNamespaceProcedure(4, nsName2, TableProcedureInterface.TableOperationType.EDIT));
286
287    // Fetch the 1st item and take the write lock
288    Procedure<?> procNs1 = queue.poll();
289    assertEquals(1, procNs1.getProcId());
290    assertFalse(queue.waitNamespaceExclusiveLock(procNs1, nsName1));
291
292    // namespace table has higher priority so we still return procedure for it
293    Procedure<?> procNs2 = queue.poll();
294    assertEquals(4, procNs2.getProcId());
295    assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
296    queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
297
298    // add procNs2 back in the queue
299    queue.yield(procNs2);
300
301    // again
302    procNs2 = queue.poll();
303    assertEquals(4, procNs2.getProcId());
304    assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
305
306    // ns1 and ns2 are both locked so we get nothing
307    assertNull(queue.poll());
308
309    // release the ns1 lock
310    queue.wakeNamespaceExclusiveLock(procNs1, nsName1);
311
312    // we are now able to execute table of ns1
313    long procId = queue.poll().getProcId();
314    assertEquals(2, procId);
315
316    // release ns2
317    queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
318
319    // we are now able to execute table of ns2
320    procId = queue.poll().getProcId();
321    assertEquals(3, procId);
322  }
323
324  @Test
325  public void testVerifyNamespaceXLock() throws Exception {
326    String nsName = "ns1";
327    TableName tableName = TableName.valueOf(nsName, name.getMethodName());
328    queue.addBack(
329      new TestNamespaceProcedure(1, nsName, TableProcedureInterface.TableOperationType.CREATE));
330    queue.addBack(
331      new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
332
333    // Fetch the ns item and take the xlock
334    Procedure<?> proc = queue.poll();
335    assertEquals(1, proc.getProcId());
336    assertEquals(false, queue.waitNamespaceExclusiveLock(proc, nsName));
337
338    // the table operation can't be executed because the ns is locked
339    assertEquals(null, queue.poll(0));
340
341    // release the ns lock
342    queue.wakeNamespaceExclusiveLock(proc, nsName);
343
344    proc = queue.poll();
345    assertEquals(2, proc.getProcId());
346    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
347    queue.wakeTableExclusiveLock(proc, tableName);
348  }
349
350  @Test
351  public void testXLockWaitingForExecutingSharedLockToRelease() {
352    final TableName tableName = TableName.valueOf(name.getMethodName());
353    final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
354      .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
355
356    queue.addBack(new TestRegionProcedure(1, tableName,
357      TableProcedureInterface.TableOperationType.REGION_ASSIGN, regionA));
358    queue.addBack(
359      new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.EDIT));
360
361    // Fetch the 1st item and take the shared lock
362    Procedure<?> proc = queue.poll();
363    assertEquals(1, proc.getProcId());
364    assertEquals(false, queue.waitRegion(proc, regionA));
365
366    // the xlock operation in the queue can't be executed
367    assertEquals(null, queue.poll(0));
368
369    // release the shared lock
370    queue.wakeRegion(proc, regionA);
371
372    // Fetch the 2nd item and take the xlock
373    proc = queue.poll();
374    assertEquals(2, proc.getProcId());
375    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
376
377    queue.addBack(new TestRegionProcedure(3, tableName,
378      TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionA));
379
380    // everything is locked by the table operation
381    assertEquals(null, queue.poll(0));
382
383    // release the table xlock
384    queue.wakeTableExclusiveLock(proc, tableName);
385
386    // grab the last item in the queue
387    proc = queue.poll();
388    assertEquals(3, proc.getProcId());
389
390    // lock and unlock the region
391    assertEquals(false, queue.waitRegion(proc, regionA));
392    assertEquals(null, queue.poll(0));
393    queue.wakeRegion(proc, regionA);
394  }
395
396  @Test
397  public void testVerifyRegionLocks() throws Exception {
398    final TableName tableName = TableName.valueOf(name.getMethodName());
399    final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
400      .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
401    final RegionInfo regionB = RegionInfoBuilder.newBuilder(tableName)
402      .setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
403    final RegionInfo regionC = RegionInfoBuilder.newBuilder(tableName)
404      .setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
405
406    queue.addBack(
407      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
408    queue.addBack(new TestRegionProcedure(2, tableName,
409      TableProcedureInterface.TableOperationType.REGION_MERGE, regionA, regionB));
410    queue.addBack(new TestRegionProcedure(3, tableName,
411      TableProcedureInterface.TableOperationType.REGION_SPLIT, regionA));
412    queue.addBack(new TestRegionProcedure(4, tableName,
413      TableProcedureInterface.TableOperationType.REGION_SPLIT, regionB));
414    queue.addBack(new TestRegionProcedure(5, tableName,
415      TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionC));
416
417    // Fetch the 1st item and take the write lock
418    Procedure<?> proc = queue.poll();
419    assertEquals(1, proc.getProcId());
420    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
421
422    // everything is locked by the table operation
423    assertEquals(null, queue.poll(0));
424
425    // release the table lock
426    queue.wakeTableExclusiveLock(proc, tableName);
427
428    // Fetch the 2nd item and the the lock on regionA and regionB
429    Procedure<?> mergeProc = queue.poll();
430    assertEquals(2, mergeProc.getProcId());
431    assertEquals(false, queue.waitRegions(mergeProc, tableName, regionA, regionB));
432
433    // Fetch the 3rd item and the try to lock region A which will fail
434    // because already locked. this procedure will go in waiting.
435    // (this stuff will be explicit until we get rid of the zk-lock)
436    Procedure<?> procA = queue.poll();
437    assertEquals(3, procA.getProcId());
438    assertEquals(true, queue.waitRegions(procA, tableName, regionA));
439
440    // Fetch the 4th item, same story as the 3rd
441    Procedure<?> procB = queue.poll();
442    assertEquals(4, procB.getProcId());
443    assertEquals(true, queue.waitRegions(procB, tableName, regionB));
444
445    // Fetch the 5th item, since it is a non-locked region we are able to execute it
446    Procedure<?> procC = queue.poll();
447    assertEquals(5, procC.getProcId());
448    assertEquals(false, queue.waitRegions(procC, tableName, regionC));
449
450    // 3rd and 4th are in the region suspended queue
451    assertEquals(null, queue.poll(0));
452
453    // Release region A-B from merge operation (procId=2)
454    queue.wakeRegions(mergeProc, tableName, regionA, regionB);
455
456    // Fetch the 3rd item, now the lock on the region is available
457    procA = queue.poll();
458    assertEquals(3, procA.getProcId());
459    assertEquals(false, queue.waitRegions(procA, tableName, regionA));
460
461    // Fetch the 4th item, now the lock on the region is available
462    procB = queue.poll();
463    assertEquals(4, procB.getProcId());
464    assertEquals(false, queue.waitRegions(procB, tableName, regionB));
465
466    // release the locks on the regions
467    queue.wakeRegions(procA, tableName, regionA);
468    queue.wakeRegions(procB, tableName, regionB);
469    queue.wakeRegions(procC, tableName, regionC);
470  }
471
472  @Test
473  public void testVerifySubProcRegionLocks() throws Exception {
474    final TableName tableName = TableName.valueOf(name.getMethodName());
475    final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
476      .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
477    final RegionInfo regionB = RegionInfoBuilder.newBuilder(tableName)
478      .setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
479    final RegionInfo regionC = RegionInfoBuilder.newBuilder(tableName)
480      .setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
481
482    queue.addBack(
483      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.ENABLE));
484
485    // Fetch the 1st item from the queue, "the root procedure" and take the table lock
486    Procedure<?> rootProc = queue.poll();
487    assertEquals(1, rootProc.getProcId());
488    assertEquals(false, queue.waitTableExclusiveLock(rootProc, tableName));
489    assertEquals(null, queue.poll(0));
490
491    // Execute the 1st step of the root-proc.
492    // we should get 3 sub-proc back, one for each region.
493    // (this step is done by the executor/rootProc, we are simulating it)
494    Procedure<?>[] subProcs = new Procedure[] {
495      new TestRegionProcedure(1, 2, tableName,
496        TableProcedureInterface.TableOperationType.REGION_EDIT, regionA),
497      new TestRegionProcedure(1, 3, tableName,
498        TableProcedureInterface.TableOperationType.REGION_EDIT, regionB),
499      new TestRegionProcedure(1, 4, tableName,
500        TableProcedureInterface.TableOperationType.REGION_EDIT, regionC), };
501
502    // at this point the rootProc is going in a waiting state
503    // and the sub-procedures will be added in the queue.
504    // (this step is done by the executor, we are simulating it)
505    for (int i = subProcs.length - 1; i >= 0; --i) {
506      queue.addFront(subProcs[i]);
507    }
508    assertEquals(subProcs.length, queue.size());
509
510    // we should be able to fetch and execute all the sub-procs,
511    // since they are operating on different regions
512    for (int i = 0; i < subProcs.length; ++i) {
513      TestRegionProcedure regionProc = (TestRegionProcedure) queue.poll(0);
514      assertEquals(subProcs[i].getProcId(), regionProc.getProcId());
515      assertEquals(false, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo()));
516    }
517
518    // nothing else in the queue
519    assertEquals(null, queue.poll(0));
520
521    // release all the region locks
522    for (int i = 0; i < subProcs.length; ++i) {
523      TestRegionProcedure regionProc = (TestRegionProcedure) subProcs[i];
524      queue.wakeRegions(regionProc, tableName, regionProc.getRegionInfo());
525    }
526
527    // nothing else in the queue
528    assertEquals(null, queue.poll(0));
529
530    // release the table lock (for the root procedure)
531    queue.wakeTableExclusiveLock(rootProc, tableName);
532  }
533
534  @Test
535  public void testInheritedRegionXLock() {
536    final TableName tableName = TableName.valueOf(name.getMethodName());
537    final RegionInfo region = RegionInfoBuilder.newBuilder(tableName)
538      .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
539
540    queue.addBack(new TestRegionProcedure(1, tableName,
541      TableProcedureInterface.TableOperationType.REGION_SPLIT, region));
542    queue.addBack(new TestRegionProcedure(1, 2, tableName,
543      TableProcedureInterface.TableOperationType.REGION_UNASSIGN, region));
544    queue.addBack(new TestRegionProcedure(3, tableName,
545      TableProcedureInterface.TableOperationType.REGION_EDIT, region));
546
547    // fetch the root proc and take the lock on the region
548    Procedure<?> rootProc = queue.poll();
549    assertEquals(1, rootProc.getProcId());
550    assertEquals(false, queue.waitRegion(rootProc, region));
551
552    // fetch the sub-proc and take the lock on the region (inherited lock)
553    Procedure<?> childProc = queue.poll();
554    assertEquals(2, childProc.getProcId());
555    assertEquals(false, queue.waitRegion(childProc, region));
556
557    // proc-3 will be fetched but it can't take the lock
558    Procedure<?> proc = queue.poll();
559    assertEquals(3, proc.getProcId());
560    assertEquals(true, queue.waitRegion(proc, region));
561
562    // release the child lock
563    queue.wakeRegion(childProc, region);
564
565    // nothing in the queue (proc-3 is suspended)
566    assertEquals(null, queue.poll(0));
567
568    // release the root lock
569    queue.wakeRegion(rootProc, region);
570
571    // proc-3 should be now available
572    proc = queue.poll();
573    assertEquals(3, proc.getProcId());
574    assertEquals(false, queue.waitRegion(proc, region));
575    queue.wakeRegion(proc, region);
576  }
577
578  @Test
579  public void testSuspendedProcedure() throws Exception {
580    final TableName tableName = TableName.valueOf(name.getMethodName());
581
582    queue.addBack(
583      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ));
584    queue.addBack(
585      new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
586
587    Procedure<?> proc = queue.poll();
588    assertEquals(1, proc.getProcId());
589
590    // suspend
591    ProcedureEvent<?> event = new ProcedureEvent<>("testSuspendedProcedureEvent");
592    assertEquals(true, event.suspendIfNotReady(proc));
593
594    proc = queue.poll();
595    assertEquals(2, proc.getProcId());
596    assertEquals(null, queue.poll(0));
597
598    // resume
599    event.wake(queue);
600
601    proc = queue.poll();
602    assertEquals(1, proc.getProcId());
603    assertEquals(null, queue.poll(0));
604  }
605
606  private static RegionInfo[] generateRegionInfo(final TableName tableName) {
607    return new RegionInfo[] {
608      RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("a"))
609        .setEndKey(Bytes.toBytes("b")).build(),
610      RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("b"))
611        .setEndKey(Bytes.toBytes("c")).build(),
612      RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("c"))
613        .setEndKey(Bytes.toBytes("d")).build() };
614  }
615
616  @Test
617  public void testParentXLockAndChildrenSharedLock() throws Exception {
618    final TableName tableName = TableName.valueOf(name.getMethodName());
619    final RegionInfo[] regions = generateRegionInfo(tableName);
620    final TestRegionProcedure[] childProcs = new TestRegionProcedure[regions.length];
621    for (int i = 0; i < regions.length; ++i) {
622      childProcs[i] = new TestRegionProcedure(1, 2 + i, tableName,
623        TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
624    }
625    testInheritedXLockAndChildrenSharedLock(tableName,
626      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.CREATE),
627      childProcs);
628  }
629
630  @Test
631  public void testRootXLockAndChildrenSharedLock() throws Exception {
632    final TableName tableName = TableName.valueOf(name.getMethodName());
633    final RegionInfo[] regions = generateRegionInfo(tableName);
634    final TestRegionProcedure[] childProcs = new TestRegionProcedure[regions.length];
635    for (int i = 0; i < regions.length; ++i) {
636      childProcs[i] = new TestRegionProcedure(1, 2, 3 + i, tableName,
637        TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
638    }
639    testInheritedXLockAndChildrenSharedLock(tableName,
640      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.CREATE),
641      childProcs);
642  }
643
644  private void testInheritedXLockAndChildrenSharedLock(final TableName tableName,
645      final TestTableProcedure rootProc, final TestRegionProcedure[] childProcs) throws Exception {
646    queue.addBack(rootProc);
647
648    // fetch and acquire first xlock proc
649    Procedure<?> parentProc = queue.poll();
650    assertEquals(rootProc, parentProc);
651    assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
652
653    // add child procedure
654    for (int i = 0; i < childProcs.length; ++i) {
655      queue.addFront(childProcs[i]);
656    }
657
658    // add another xlock procedure (no parent)
659    queue.addBack(
660      new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.EDIT));
661
662    // fetch and execute child
663    for (int i = 0; i < childProcs.length; ++i) {
664      TestRegionProcedure childProc = (TestRegionProcedure) queue.poll();
665      LOG.debug("fetch children " + childProc);
666      assertEquals(false, queue.waitRegions(childProc, tableName, childProc.getRegionInfo()));
667      queue.wakeRegions(childProc, tableName, childProc.getRegionInfo());
668    }
669
670    // nothing available, until xlock release
671    assertEquals(null, queue.poll(0));
672
673    // release xlock
674    queue.wakeTableExclusiveLock(parentProc, tableName);
675
676    // fetch the other xlock proc
677    Procedure<?> proc = queue.poll();
678    assertEquals(100, proc.getProcId());
679    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
680    queue.wakeTableExclusiveLock(proc, tableName);
681  }
682
683  @Test
684  public void testParentXLockAndChildrenXLock() throws Exception {
685    final TableName tableName = TableName.valueOf(name.getMethodName());
686    testInheritedXLockAndChildrenXLock(tableName,
687      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT),
688      new TestTableProcedure(1, 2, tableName, TableProcedureInterface.TableOperationType.EDIT));
689  }
690
691  @Test
692  public void testRootXLockAndChildrenXLock() throws Exception {
693    final TableName tableName = TableName.valueOf(name.getMethodName());
694    // simulate 3 procedures: 1 (root), (2) child of root, (3) child of proc-2
695    testInheritedXLockAndChildrenXLock(tableName,
696      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT),
697      new TestTableProcedure(1, 2, 3, tableName, TableProcedureInterface.TableOperationType.EDIT));
698  }
699
700  private void testInheritedXLockAndChildrenXLock(final TableName tableName,
701      final TestTableProcedure rootProc, final TestTableProcedure childProc) throws Exception {
702    queue.addBack(rootProc);
703
704    // fetch and acquire first xlock proc
705    Procedure<?> parentProc = queue.poll();
706    assertEquals(rootProc, parentProc);
707    assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
708
709    // add child procedure
710    queue.addFront(childProc);
711
712    // fetch the other xlock proc
713    Procedure<?> proc = queue.poll();
714    assertEquals(childProc, proc);
715    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
716    queue.wakeTableExclusiveLock(proc, tableName);
717
718    // release xlock
719    queue.wakeTableExclusiveLock(parentProc, tableName);
720  }
721
722  @Test
723  public void testYieldWithXLockHeld() throws Exception {
724    final TableName tableName = TableName.valueOf(name.getMethodName());
725
726    queue.addBack(
727      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
728    queue.addBack(
729      new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.EDIT));
730
731    // fetch from the queue and acquire xlock for the first proc
732    Procedure<?> proc = queue.poll();
733    assertEquals(1, proc.getProcId());
734    assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
735
736    // nothing available, until xlock release
737    assertEquals(null, queue.poll(0));
738
739    // put the proc in the queue
740    queue.yield(proc);
741
742    // fetch from the queue, it should be the one with just added back
743    proc = queue.poll();
744    assertEquals(1, proc.getProcId());
745
746    // release the xlock
747    queue.wakeTableExclusiveLock(proc, tableName);
748
749    proc = queue.poll();
750    assertEquals(2, proc.getProcId());
751  }
752
753  @Test
754  public void testYieldWithSharedLockHeld() throws Exception {
755    final TableName tableName = TableName.valueOf(name.getMethodName());
756
757    queue.addBack(
758      new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ));
759    queue.addBack(
760      new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
761    queue.addBack(
762      new TestTableProcedure(3, tableName, TableProcedureInterface.TableOperationType.EDIT));
763
764    // fetch and acquire the first shared-lock
765    Procedure<?> proc1 = queue.poll();
766    assertEquals(1, proc1.getProcId());
767    assertEquals(false, queue.waitTableSharedLock(proc1, tableName));
768
769    // fetch and acquire the second shared-lock
770    Procedure<?> proc2 = queue.poll();
771    assertEquals(2, proc2.getProcId());
772    assertEquals(false, queue.waitTableSharedLock(proc2, tableName));
773
774    // nothing available, until xlock release
775    assertEquals(null, queue.poll(0));
776
777    // put the procs back in the queue
778    queue.yield(proc1);
779    queue.yield(proc2);
780
781    // fetch from the queue, it should fetch the ones with just added back
782    proc1 = queue.poll();
783    assertEquals(1, proc1.getProcId());
784    proc2 = queue.poll();
785    assertEquals(2, proc2.getProcId());
786
787    // release the xlock
788    queue.wakeTableSharedLock(proc1, tableName);
789    queue.wakeTableSharedLock(proc2, tableName);
790
791    Procedure<?> proc3 = queue.poll();
792    assertEquals(3, proc3.getProcId());
793  }
794
795  public static class TestTableProcedure extends TestProcedure implements TableProcedureInterface {
796    private final TableOperationType opType;
797    private final TableName tableName;
798
799    public TestTableProcedure() {
800      throw new UnsupportedOperationException("recovery should not be triggered here");
801    }
802
803    public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
804      this(-1, procId, tableName, opType);
805    }
806
807    public TestTableProcedure(long parentProcId, long procId, TableName tableName,
808        TableOperationType opType) {
809      this(-1, parentProcId, procId, tableName, opType);
810    }
811
812    public TestTableProcedure(long rootProcId, long parentProcId, long procId, TableName tableName,
813        TableOperationType opType) {
814      super(procId, parentProcId, rootProcId, null);
815      this.tableName = tableName;
816      this.opType = opType;
817    }
818
819    @Override
820    public TableName getTableName() {
821      return tableName;
822    }
823
824    @Override
825    public TableOperationType getTableOperationType() {
826      return opType;
827    }
828
829    @Override
830    public void toStringClassDetails(final StringBuilder sb) {
831      sb.append(getClass().getSimpleName());
832      sb.append("(table=");
833      sb.append(getTableName());
834      sb.append(")");
835    }
836  }
837
838  public static class TestTableProcedureWithEvent extends TestTableProcedure {
839    private final ProcedureEvent<?> event;
840
841    public TestTableProcedureWithEvent(long procId, TableName tableName,
842        TableOperationType opType) {
843      super(procId, tableName, opType);
844      event = new ProcedureEvent<>(tableName + " procId=" + procId);
845    }
846
847    public ProcedureEvent<?> getEvent() {
848      return event;
849    }
850  }
851
852  public static class TestRegionProcedure extends TestTableProcedure {
853    private final RegionInfo[] regionInfos;
854
855    public TestRegionProcedure() {
856      throw new UnsupportedOperationException("recovery should not be triggered here");
857    }
858
859    public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType,
860        RegionInfo... regionInfos) {
861      this(-1, procId, tableName, opType, regionInfos);
862    }
863
864    public TestRegionProcedure(long parentProcId, long procId, TableName tableName,
865        TableOperationType opType, RegionInfo... regionInfos) {
866      this(-1, parentProcId, procId, tableName, opType, regionInfos);
867    }
868
869    public TestRegionProcedure(long rootProcId, long parentProcId, long procId, TableName tableName,
870        TableOperationType opType, RegionInfo... regionInfos) {
871      super(rootProcId, parentProcId, procId, tableName, opType);
872      this.regionInfos = regionInfos;
873    }
874
875    public RegionInfo[] getRegionInfo() {
876      return regionInfos;
877    }
878
879    @Override
880    public void toStringClassDetails(final StringBuilder sb) {
881      sb.append(getClass().getSimpleName());
882      sb.append("(regions=");
883      sb.append(Arrays.toString(getRegionInfo()));
884      sb.append(")");
885    }
886  }
887
888  public static class TestNamespaceProcedure extends TestProcedure
889      implements TableProcedureInterface {
890    private final TableOperationType opType;
891    private final String nsName;
892
893    public TestNamespaceProcedure() {
894      throw new UnsupportedOperationException("recovery should not be triggered here");
895    }
896
897    public TestNamespaceProcedure(long procId, String nsName, TableOperationType opType) {
898      super(procId);
899      this.nsName = nsName;
900      this.opType = opType;
901    }
902
903    @Override
904    public TableName getTableName() {
905      return TableName.NAMESPACE_TABLE_NAME;
906    }
907
908    @Override
909    public TableOperationType getTableOperationType() {
910      return opType;
911    }
912
913    @Override
914    public void toStringClassDetails(final StringBuilder sb) {
915      sb.append(getClass().getSimpleName());
916      sb.append("(ns=");
917      sb.append(nsName);
918      sb.append(")");
919    }
920  }
921
922  public static class TestPeerProcedure extends TestProcedure implements PeerProcedureInterface {
923    private final String peerId;
924    private final PeerOperationType opType;
925
926    public TestPeerProcedure(long procId, String peerId, PeerOperationType opType) {
927      super(procId);
928      this.peerId = peerId;
929      this.opType = opType;
930    }
931
932    @Override
933    public String getPeerId() {
934      return peerId;
935    }
936
937    @Override
938    public PeerOperationType getPeerOperationType() {
939      return opType;
940    }
941  }
942
943  private static LockProcedure createLockProcedure(LockType lockType, long procId)
944      throws Exception {
945    LockProcedure procedure = new LockProcedure();
946
947    Field typeField = LockProcedure.class.getDeclaredField("type");
948    typeField.setAccessible(true);
949    typeField.set(procedure, lockType);
950
951    Method setProcIdMethod = Procedure.class.getDeclaredMethod("setProcId", long.class);
952    setProcIdMethod.setAccessible(true);
953    setProcIdMethod.invoke(procedure, procId);
954
955    return procedure;
956  }
957
958  private static LockProcedure createExclusiveLockProcedure(long procId) throws Exception {
959    return createLockProcedure(LockType.EXCLUSIVE, procId);
960  }
961
962  private static LockProcedure createSharedLockProcedure(long procId) throws Exception {
963    return createLockProcedure(LockType.SHARED, procId);
964  }
965
966  private static void assertLockResource(LockedResource resource, LockedResourceType resourceType,
967      String resourceName) {
968    assertEquals(resourceType, resource.getResourceType());
969    assertEquals(resourceName, resource.getResourceName());
970  }
971
972  private static void assertExclusiveLock(LockedResource resource, Procedure<?> procedure) {
973    assertEquals(LockType.EXCLUSIVE, resource.getLockType());
974    assertEquals(procedure, resource.getExclusiveLockOwnerProcedure());
975    assertEquals(0, resource.getSharedLockCount());
976  }
977
978  private static void assertSharedLock(LockedResource resource, int lockCount) {
979    assertEquals(LockType.SHARED, resource.getLockType());
980    assertEquals(lockCount, resource.getSharedLockCount());
981  }
982
983  @Test
984  public void testListLocksServer() throws Exception {
985    LockProcedure procedure = createExclusiveLockProcedure(0);
986    queue.waitServerExclusiveLock(procedure, ServerName.valueOf("server1,1234,0"));
987
988    List<LockedResource> resources = queue.getLocks();
989    assertEquals(1, resources.size());
990
991    LockedResource serverResource = resources.get(0);
992    assertLockResource(serverResource, LockedResourceType.SERVER, "server1,1234,0");
993    assertExclusiveLock(serverResource, procedure);
994    assertTrue(serverResource.getWaitingProcedures().isEmpty());
995  }
996
997  @Test
998  public void testListLocksNamespace() throws Exception {
999    LockProcedure procedure = createExclusiveLockProcedure(1);
1000    queue.waitNamespaceExclusiveLock(procedure, "ns1");
1001
1002    List<LockedResource> locks = queue.getLocks();
1003    assertEquals(2, locks.size());
1004
1005    LockedResource namespaceResource = locks.get(0);
1006    assertLockResource(namespaceResource, LockedResourceType.NAMESPACE, "ns1");
1007    assertExclusiveLock(namespaceResource, procedure);
1008    assertTrue(namespaceResource.getWaitingProcedures().isEmpty());
1009
1010    LockedResource tableResource = locks.get(1);
1011    assertLockResource(tableResource, LockedResourceType.TABLE,
1012      TableName.NAMESPACE_TABLE_NAME.getNameAsString());
1013    assertSharedLock(tableResource, 1);
1014    assertTrue(tableResource.getWaitingProcedures().isEmpty());
1015  }
1016
1017  @Test
1018  public void testListLocksTable() throws Exception {
1019    LockProcedure procedure = createExclusiveLockProcedure(2);
1020    queue.waitTableExclusiveLock(procedure, TableName.valueOf("ns2", "table2"));
1021
1022    List<LockedResource> locks = queue.getLocks();
1023    assertEquals(2, locks.size());
1024
1025    LockedResource namespaceResource = locks.get(0);
1026    assertLockResource(namespaceResource, LockedResourceType.NAMESPACE, "ns2");
1027    assertSharedLock(namespaceResource, 1);
1028    assertTrue(namespaceResource.getWaitingProcedures().isEmpty());
1029
1030    LockedResource tableResource = locks.get(1);
1031    assertLockResource(tableResource, LockedResourceType.TABLE, "ns2:table2");
1032    assertExclusiveLock(tableResource, procedure);
1033    assertTrue(tableResource.getWaitingProcedures().isEmpty());
1034  }
1035
1036  @Test
1037  public void testListLocksRegion() throws Exception {
1038    LockProcedure procedure = createExclusiveLockProcedure(3);
1039    RegionInfo regionInfo =
1040      RegionInfoBuilder.newBuilder(TableName.valueOf("ns3", "table3")).build();
1041
1042    queue.waitRegion(procedure, regionInfo);
1043
1044    List<LockedResource> resources = queue.getLocks();
1045    assertEquals(3, resources.size());
1046
1047    LockedResource namespaceResource = resources.get(0);
1048    assertLockResource(namespaceResource, LockedResourceType.NAMESPACE, "ns3");
1049    assertSharedLock(namespaceResource, 1);
1050    assertTrue(namespaceResource.getWaitingProcedures().isEmpty());
1051
1052    LockedResource tableResource = resources.get(1);
1053    assertLockResource(tableResource, LockedResourceType.TABLE, "ns3:table3");
1054    assertSharedLock(tableResource, 1);
1055    assertTrue(tableResource.getWaitingProcedures().isEmpty());
1056
1057    LockedResource regionResource = resources.get(2);
1058    assertLockResource(regionResource, LockedResourceType.REGION, regionInfo.getEncodedName());
1059    assertExclusiveLock(regionResource, procedure);
1060    assertTrue(regionResource.getWaitingProcedures().isEmpty());
1061  }
1062
1063  @Test
1064  public void testListLocksPeer() throws Exception {
1065    String peerId = "1";
1066    LockProcedure procedure = createExclusiveLockProcedure(4);
1067    queue.waitPeerExclusiveLock(procedure, peerId);
1068
1069    List<LockedResource> locks = queue.getLocks();
1070    assertEquals(1, locks.size());
1071
1072    LockedResource resource = locks.get(0);
1073    assertLockResource(resource, LockedResourceType.PEER, peerId);
1074    assertExclusiveLock(resource, procedure);
1075    assertTrue(resource.getWaitingProcedures().isEmpty());
1076
1077    // Try to acquire the exclusive lock again with same procedure
1078    assertFalse(queue.waitPeerExclusiveLock(procedure, peerId));
1079
1080    // Try to acquire the exclusive lock again with new procedure
1081    LockProcedure procedure2 = createExclusiveLockProcedure(5);
1082    assertTrue(queue.waitPeerExclusiveLock(procedure2, peerId));
1083
1084    // Same peerId, still only has 1 LockedResource
1085    locks = queue.getLocks();
1086    assertEquals(1, locks.size());
1087
1088    resource = locks.get(0);
1089    assertLockResource(resource, LockedResourceType.PEER, peerId);
1090    // LockedResource owner still is the origin procedure
1091    assertExclusiveLock(resource, procedure);
1092    // The new procedure should in the waiting list
1093    assertEquals(1, resource.getWaitingProcedures().size());
1094  }
1095
1096  @Test
1097  public void testListLocksWaiting() throws Exception {
1098    LockProcedure procedure1 = createExclusiveLockProcedure(1);
1099    queue.waitTableExclusiveLock(procedure1, TableName.valueOf("ns4", "table4"));
1100
1101    LockProcedure procedure2 = createSharedLockProcedure(2);
1102    queue.waitTableSharedLock(procedure2, TableName.valueOf("ns4", "table4"));
1103
1104    LockProcedure procedure3 = createExclusiveLockProcedure(3);
1105    queue.waitTableExclusiveLock(procedure3, TableName.valueOf("ns4", "table4"));
1106
1107    List<LockedResource> resources = queue.getLocks();
1108    assertEquals(2, resources.size());
1109
1110    LockedResource namespaceResource = resources.get(0);
1111    assertLockResource(namespaceResource, LockedResourceType.NAMESPACE, "ns4");
1112    assertSharedLock(namespaceResource, 1);
1113    assertTrue(namespaceResource.getWaitingProcedures().isEmpty());
1114
1115    LockedResource tableLock = resources.get(1);
1116    assertLockResource(tableLock, LockedResourceType.TABLE, "ns4:table4");
1117    assertExclusiveLock(tableLock, procedure1);
1118
1119    List<Procedure<?>> waitingProcedures = tableLock.getWaitingProcedures();
1120    assertEquals(2, waitingProcedures.size());
1121
1122    LockProcedure waitingProcedure2 = (LockProcedure) waitingProcedures.get(0);
1123    assertEquals(LockType.SHARED, waitingProcedure2.getType());
1124    assertEquals(procedure2, waitingProcedure2);
1125
1126    LockProcedure waitingProcedure3 = (LockProcedure) waitingProcedures.get(1);
1127    assertEquals(LockType.EXCLUSIVE, waitingProcedure3.getType());
1128    assertEquals(procedure3, waitingProcedure3);
1129  }
1130
1131  @Test
1132  public void testAcquireSharedLockWhileParentHoldingExclusiveLock() {
1133    TableName tableName = TableName.valueOf(name.getMethodName());
1134    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
1135
1136    TestTableProcedure parentProc = new TestTableProcedure(1, tableName, TableOperationType.EDIT);
1137    TestRegionProcedure proc =
1138      new TestRegionProcedure(1, 2, tableName, TableOperationType.REGION_EDIT, regionInfo);
1139    queue.addBack(parentProc);
1140
1141    assertSame(parentProc, queue.poll());
1142    assertFalse(queue.waitTableExclusiveLock(parentProc, tableName));
1143
1144    // The queue for this table should be added back to run queue as the parent has the xlock, so we
1145    // can poll it out.
1146    queue.addFront(proc);
1147    assertSame(proc, queue.poll());
1148    // the parent has xlock on the table, and it is OK for us to acquire shared lock on the table,
1149    // this is what this test wants to confirm
1150    assertFalse(queue.waitRegion(proc, regionInfo));
1151
1152    queue.wakeRegion(proc, regionInfo);
1153    queue.wakeTableExclusiveLock(parentProc, tableName);
1154  }
1155}