001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.util.function.Function;
022import java.util.function.Predicate;
023import java.util.stream.Stream;
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Locking for mutual exclusion between procedures. Used only by procedure framework internally.
028 * {@link LockAndQueue} has two purposes:
029 * <ol>
030 * <li>Acquire/release exclusive/shared locks.</li>
031 * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends
032 * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance
033 * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way
034 * because in certain cases, there will be millions of regions. This layout uses less memory.
035 * </ol>
036 * <p/>
037 * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
038 * guarded by schedLock(). <br/>
039 * There is no need of 'volatile' keyword for member variables because of memory synchronization
040 * guarantees of locks (see 'Memory Synchronization',
041 * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) <br/>
042 * We do not implement Lock interface because we need exclusive and shared locking, and also because
043 * try-lock functions require procedure id. <br/>
044 * We do not use ReentrantReadWriteLock directly because of its high memory overhead.
045 */
046@InterfaceAudience.Private
047public class LockAndQueue implements LockStatus {
048
049  private final Function<Long, Procedure<?>> procedureRetriever;
050  private final ProcedureDeque queue = new ProcedureDeque();
051  private Procedure<?> exclusiveLockOwnerProcedure = null;
052  private int sharedLock = 0;
053
054  // ======================================================================
055  // Lock Status
056  // ======================================================================
057
058  public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) {
059    this.procedureRetriever = procedureRetriever;
060  }
061
062  @Override
063  public boolean hasExclusiveLock() {
064    return this.exclusiveLockOwnerProcedure != null;
065  }
066
067  @Override
068  public boolean hasLockAccess(Procedure<?> proc) {
069    if (exclusiveLockOwnerProcedure == null) {
070      return false;
071    }
072    long lockOwnerId = exclusiveLockOwnerProcedure.getProcId();
073    if (proc.getProcId() == lockOwnerId) {
074      return true;
075    }
076    if (!proc.hasParent()) {
077      return false;
078    }
079    // fast path to check root procedure
080    if (proc.getRootProcId() == lockOwnerId) {
081      return true;
082    }
083    // check ancestors
084    for (Procedure<?> p = proc;;) {
085      if (p.getParentProcId() == lockOwnerId) {
086        return true;
087      }
088      p = procedureRetriever.apply(p.getParentProcId());
089      if (p == null || !p.hasParent()) {
090        return false;
091      }
092    }
093  }
094
095  @Override
096  public Procedure<?> getExclusiveLockOwnerProcedure() {
097    return exclusiveLockOwnerProcedure;
098  }
099
100  @Override
101  public int getSharedLockCount() {
102    return sharedLock;
103  }
104
105  // ======================================================================
106  // try/release Shared/Exclusive lock
107  // ======================================================================
108
109  /**
110   * @return whether we have succesfully acquired the shared lock.
111   */
112  public boolean trySharedLock(Procedure<?> proc) {
113    if (hasExclusiveLock() && !hasLockAccess(proc)) {
114      return false;
115    }
116    // If no one holds the xlock, then we are free to hold the sharedLock
117    // If the parent proc or we have already held the xlock, then we return true here as
118    // xlock is more powerful then shared lock.
119    sharedLock++;
120    return true;
121  }
122
123  /**
124   * @return whether we should wake the procedures waiting on the lock here.
125   */
126  public boolean releaseSharedLock() {
127    // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our
128    // parent have held the xlock. And since there is still an exclusive lock, we do not need to
129    // wake any procedures.
130    return --sharedLock == 0 && !hasExclusiveLock();
131  }
132
133  public boolean tryExclusiveLock(Procedure<?> proc) {
134    if (isLocked()) {
135      return hasLockAccess(proc);
136    }
137    exclusiveLockOwnerProcedure = proc;
138    return true;
139  }
140
141  /**
142   * @return whether we should wake the procedures waiting on the lock here.
143   */
144  public boolean releaseExclusiveLock(Procedure<?> proc) {
145    if (exclusiveLockOwnerProcedure == null ||
146      exclusiveLockOwnerProcedure.getProcId() != proc.getProcId()) {
147      // We are not the lock owner, it is probably inherited from the parent procedures.
148      return false;
149    }
150    exclusiveLockOwnerProcedure = null;
151    // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent
152    // proc or we have already held the xlock, and also allow releasing the locks in any order, so
153    // it could happen that the xlock is released but there are still some procs holding the shared
154    // lock.
155    // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which
156    // acquire the shared lock on the same lock. This is because we will schedule the sub proces
157    // before releasing the lock, so the sub procs could call acquire lock before we releasing the
158    // xlock.
159    return sharedLock == 0;
160  }
161
162  public boolean isWaitingQueueEmpty() {
163    return queue.isEmpty();
164  }
165
166  public Procedure<?> removeFirst() {
167    return queue.removeFirst();
168  }
169
170  public void addLast(Procedure<?> proc) {
171    queue.addLast(proc);
172  }
173
174  public int wakeWaitingProcedures(ProcedureScheduler scheduler) {
175    int count = queue.size();
176    // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so
177    // that the procedure which was added first goes in the front for the scheduler queue.
178    scheduler.addFront(queue.descendingIterator());
179    queue.clear();
180    return count;
181  }
182
183  @SuppressWarnings("rawtypes")
184  public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) {
185    return queue.stream().filter(predicate);
186  }
187
188  @Override
189  public String toString() {
190    return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") +
191      ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size();
192  }
193}