001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.function.Function;
021import java.util.function.Predicate;
022import java.util.stream.Stream;
023import org.apache.commons.lang3.builder.ToStringBuilder;
024import org.apache.commons.lang3.builder.ToStringStyle;
025import org.apache.yetus.audience.InterfaceAudience;
026
027/**
028 * Locking for mutual exclusion between procedures. Used only by procedure framework internally.
029 * {@link LockAndQueue} has two purposes:
030 * <ol>
031 * <li>Acquire/release exclusive/shared locks.</li>
032 * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends
033 * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance
034 * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way
035 * because in certain cases, there will be millions of regions. This layout uses less memory.
036 * </ol>
037 * <p/>
038 * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
039 * guarded by schedLock(). <br/>
040 * There is no need of 'volatile' keyword for member variables because of memory synchronization
041 * guarantees of locks (see
042 * <a href="http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html">Memory
043 * Synchronization</a>) <br/>
044 * We do not implement Lock interface because we need exclusive and shared locking, and also because
045 * try-lock functions require procedure id. <br/>
046 * We do not use ReentrantReadWriteLock directly because of its high memory overhead.
047 */
048@InterfaceAudience.Private
049public class LockAndQueue implements LockStatus {
050
051  private final Function<Long, Procedure<?>> procedureRetriever;
052  private final ProcedureDeque queue = new ProcedureDeque();
053  private Procedure<?> exclusiveLockOwnerProcedure = null;
054  private int sharedLock = 0;
055
056  // ======================================================================
057  // Lock Status
058  // ======================================================================
059
060  public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) {
061    this.procedureRetriever = procedureRetriever;
062  }
063
064  @Override
065  public boolean hasExclusiveLock() {
066    return this.exclusiveLockOwnerProcedure != null;
067  }
068
069  @Override
070  public boolean hasLockAccess(Procedure<?> proc) {
071    if (exclusiveLockOwnerProcedure == null) {
072      return false;
073    }
074    long lockOwnerId = exclusiveLockOwnerProcedure.getProcId();
075    if (proc.getProcId() == lockOwnerId) {
076      return true;
077    }
078    if (!proc.hasParent()) {
079      return false;
080    }
081    // fast path to check root procedure
082    if (proc.getRootProcId() == lockOwnerId) {
083      return true;
084    }
085    // check ancestors
086    for (Procedure<?> p = proc;;) {
087      if (p.getParentProcId() == lockOwnerId) {
088        return true;
089      }
090      p = procedureRetriever.apply(p.getParentProcId());
091      if (p == null || !p.hasParent()) {
092        return false;
093      }
094    }
095  }
096
097  @Override
098  public Procedure<?> getExclusiveLockOwnerProcedure() {
099    return exclusiveLockOwnerProcedure;
100  }
101
102  @Override
103  public int getSharedLockCount() {
104    return sharedLock;
105  }
106
107  // ======================================================================
108  // try/release Shared/Exclusive lock
109  // ======================================================================
110
111  /** Returns whether we have succesfully acquired the shared lock. */
112  public boolean trySharedLock(Procedure<?> proc) {
113    if (hasExclusiveLock() && !hasLockAccess(proc)) {
114      return false;
115    }
116    // If no one holds the xlock, then we are free to hold the sharedLock
117    // If the parent proc or we have already held the xlock, then we return true here as
118    // xlock is more powerful then shared lock.
119    sharedLock++;
120    return true;
121  }
122
123  /** Returns whether we should wake the procedures waiting on the lock here. */
124  public boolean releaseSharedLock() {
125    // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our
126    // parent have held the xlock. And since there is still an exclusive lock, we do not need to
127    // wake any procedures.
128    return --sharedLock == 0 && !hasExclusiveLock();
129  }
130
131  public boolean tryExclusiveLock(Procedure<?> proc) {
132    if (isLocked()) {
133      return hasLockAccess(proc);
134    }
135    exclusiveLockOwnerProcedure = proc;
136    return true;
137  }
138
139  /** Returns whether we should wake the procedures waiting on the lock here. */
140  public boolean releaseExclusiveLock(Procedure<?> proc) {
141    if (
142      exclusiveLockOwnerProcedure == null
143        || exclusiveLockOwnerProcedure.getProcId() != proc.getProcId()
144    ) {
145      // We are not the lock owner, it is probably inherited from the parent procedures.
146      return false;
147    }
148    exclusiveLockOwnerProcedure = null;
149    // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent
150    // proc or we have already held the xlock, and also allow releasing the locks in any order, so
151    // it could happen that the xlock is released but there are still some procs holding the shared
152    // lock.
153    // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which
154    // acquire the shared lock on the same lock. This is because we will schedule the sub proces
155    // before releasing the lock, so the sub procs could call acquire lock before we releasing the
156    // xlock.
157    return sharedLock == 0;
158  }
159
160  public boolean isWaitingQueueEmpty() {
161    return queue.isEmpty();
162  }
163
164  public Procedure<?> removeFirst() {
165    return queue.removeFirst();
166  }
167
168  public void addLast(Procedure<?> proc) {
169    queue.addLast(proc);
170  }
171
172  public int wakeWaitingProcedures(ProcedureScheduler scheduler) {
173    int count = queue.size();
174    // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so
175    // that the procedure which was added first goes in the front for the scheduler queue.
176    scheduler.addFront(queue.descendingIterator());
177    queue.clear();
178    return count;
179  }
180
181  @SuppressWarnings("rawtypes")
182  public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) {
183    return queue.stream().filter(predicate);
184  }
185
186  @Override
187  public String toString() {
188    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
189      .appendSuper(describeLockStatus()).append("waitingProcCount", queue.size()).build();
190  }
191}