001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.function.Function;
021import java.util.function.Predicate;
022import java.util.stream.Stream;
023import org.apache.yetus.audience.InterfaceAudience;
024
025/**
026 * Locking for mutual exclusion between procedures. Used only by procedure framework internally.
027 * {@link LockAndQueue} has two purposes:
028 * <ol>
029 * <li>Acquire/release exclusive/shared locks.</li>
030 * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends
031 * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance
032 * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way
033 * because in certain cases, there will be millions of regions. This layout uses less memory.
034 * </ol>
035 * <p/>
036 * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
037 * guarded by schedLock(). <br/>
038 * There is no need of 'volatile' keyword for member variables because of memory synchronization
039 * guarantees of locks (see 'Memory Synchronization',
040 * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) <br/>
041 * We do not implement Lock interface because we need exclusive and shared locking, and also because
042 * try-lock functions require procedure id. <br/>
043 * We do not use ReentrantReadWriteLock directly because of its high memory overhead.
044 */
045@InterfaceAudience.Private
046public class LockAndQueue implements LockStatus {
047
048  private final Function<Long, Procedure<?>> procedureRetriever;
049  private final ProcedureDeque queue = new ProcedureDeque();
050  private Procedure<?> exclusiveLockOwnerProcedure = null;
051  private int sharedLock = 0;
052
053  // ======================================================================
054  // Lock Status
055  // ======================================================================
056
057  public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) {
058    this.procedureRetriever = procedureRetriever;
059  }
060
061  @Override
062  public boolean hasExclusiveLock() {
063    return this.exclusiveLockOwnerProcedure != null;
064  }
065
066  @Override
067  public boolean hasLockAccess(Procedure<?> proc) {
068    if (exclusiveLockOwnerProcedure == null) {
069      return false;
070    }
071    long lockOwnerId = exclusiveLockOwnerProcedure.getProcId();
072    if (proc.getProcId() == lockOwnerId) {
073      return true;
074    }
075    if (!proc.hasParent()) {
076      return false;
077    }
078    // fast path to check root procedure
079    if (proc.getRootProcId() == lockOwnerId) {
080      return true;
081    }
082    // check ancestors
083    for (Procedure<?> p = proc;;) {
084      if (p.getParentProcId() == lockOwnerId) {
085        return true;
086      }
087      p = procedureRetriever.apply(p.getParentProcId());
088      if (p == null || !p.hasParent()) {
089        return false;
090      }
091    }
092  }
093
094  @Override
095  public Procedure<?> getExclusiveLockOwnerProcedure() {
096    return exclusiveLockOwnerProcedure;
097  }
098
099  @Override
100  public int getSharedLockCount() {
101    return sharedLock;
102  }
103
104  // ======================================================================
105  // try/release Shared/Exclusive lock
106  // ======================================================================
107
108  /** Returns whether we have succesfully acquired the shared lock. */
109  public boolean trySharedLock(Procedure<?> proc) {
110    if (hasExclusiveLock() && !hasLockAccess(proc)) {
111      return false;
112    }
113    // If no one holds the xlock, then we are free to hold the sharedLock
114    // If the parent proc or we have already held the xlock, then we return true here as
115    // xlock is more powerful then shared lock.
116    sharedLock++;
117    return true;
118  }
119
120  /** Returns whether we should wake the procedures waiting on the lock here. */
121  public boolean releaseSharedLock() {
122    // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our
123    // parent have held the xlock. And since there is still an exclusive lock, we do not need to
124    // wake any procedures.
125    return --sharedLock == 0 && !hasExclusiveLock();
126  }
127
128  public boolean tryExclusiveLock(Procedure<?> proc) {
129    if (isLocked()) {
130      return hasLockAccess(proc);
131    }
132    exclusiveLockOwnerProcedure = proc;
133    return true;
134  }
135
136  /** Returns whether we should wake the procedures waiting on the lock here. */
137  public boolean releaseExclusiveLock(Procedure<?> proc) {
138    if (
139      exclusiveLockOwnerProcedure == null
140        || exclusiveLockOwnerProcedure.getProcId() != proc.getProcId()
141    ) {
142      // We are not the lock owner, it is probably inherited from the parent procedures.
143      return false;
144    }
145    exclusiveLockOwnerProcedure = null;
146    // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent
147    // proc or we have already held the xlock, and also allow releasing the locks in any order, so
148    // it could happen that the xlock is released but there are still some procs holding the shared
149    // lock.
150    // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which
151    // acquire the shared lock on the same lock. This is because we will schedule the sub proces
152    // before releasing the lock, so the sub procs could call acquire lock before we releasing the
153    // xlock.
154    return sharedLock == 0;
155  }
156
157  public boolean isWaitingQueueEmpty() {
158    return queue.isEmpty();
159  }
160
161  public Procedure<?> removeFirst() {
162    return queue.removeFirst();
163  }
164
165  public void addLast(Procedure<?> proc) {
166    queue.addLast(proc);
167  }
168
169  public int wakeWaitingProcedures(ProcedureScheduler scheduler) {
170    int count = queue.size();
171    // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so
172    // that the procedure which was added first goes in the front for the scheduler queue.
173    scheduler.addFront(queue.descendingIterator());
174    queue.clear();
175    return count;
176  }
177
178  @SuppressWarnings("rawtypes")
179  public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) {
180    return queue.stream().filter(predicate);
181  }
182
183  @Override
184  public String toString() {
185    return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE")
186      + ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size();
187  }
188}