001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.util.ArrayDeque;
021import java.util.Optional;
022import java.util.Queue;
023import java.util.function.Function;
024import org.apache.hadoop.hbase.procedure2.Procedure;
025import org.apache.yetus.audience.InterfaceAudience;
026
027import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
028
029/**
030 * To prevent multiple Create/Modify/Disable/Enable table procedures run at the same time, we will
031 * keep table procedure in this queue first before actually enqueuing it to
032 * MasterProcedureScheduler's tableQueue. See HBASE-28683 for more details
033 */
034@InterfaceAudience.Private
035class TableProcedureWaitingQueue {
036
037  private final Function<Long, Procedure<?>> procedureRetriever;
038
039  // whether there is already a table procedure enqueued in ProcedureScheduler.
040  private Procedure<?> enqueuedProc;
041
042  private final Queue<Procedure<?>> queue = new ArrayDeque<>();
043
044  TableProcedureWaitingQueue(Function<Long, Procedure<?>> procedureRetriever) {
045    this.procedureRetriever = procedureRetriever;
046  }
047
048  private boolean isSubProcedure(Procedure<?> proc) {
049    while (proc.hasParent()) {
050      if (proc.getParentProcId() == enqueuedProc.getProcId()) {
051        return true;
052      }
053      proc = Preconditions.checkNotNull(procedureRetriever.apply(proc.getParentProcId()),
054        "can not find parent procedure pid=%s", proc.getParentProcId());
055    }
056    return false;
057  }
058
059  /**
060   * Return whether we can enqueue this procedure to ProcedureScheduler.
061   * <p>
062   * If returns {@code true}, you should enqueue this procedure, otherwise you just need to do
063   * nothing, as we will queue it in the waitingQueue, and you will finally get it again by calling
064   * {@link #procedureCompleted(Procedure)} method in the future.
065   */
066  boolean procedureSubmitted(Procedure<?> proc) {
067    if (enqueuedProc == null) {
068      // no procedure enqueued yet, record it and return
069      enqueuedProc = proc;
070      return true;
071    }
072    if (proc == enqueuedProc) {
073      // the same procedure is enqueued again, this usually because the procedure comes back from
074      // WAITING state, such as all child procedures are finished
075      return true;
076    }
077    // check whether this is a sub procedure of the enqueued procedure
078    if (isSubProcedure(proc)) {
079      return true;
080    }
081    queue.add(proc);
082    return false;
083  }
084
085  /**
086   * Return the next procedure which can be enqueued to ProcedureScheduler.
087   */
088  Optional<Procedure<?>> procedureCompleted(Procedure<?> proc) {
089    Preconditions.checkState(enqueuedProc != null, "enqueued procedure should not be null");
090    if (enqueuedProc == proc) {
091      if (!queue.isEmpty()) {
092        enqueuedProc = queue.poll();
093        return Optional.of(enqueuedProc);
094      } else {
095        enqueuedProc = null;
096        return Optional.empty();
097      }
098    } else {
099      Preconditions.checkState(isSubProcedure(proc),
100        "procedure %s is not a sub procedure of enqueued procedure %s", proc, enqueuedProc);
101      return Optional.empty();
102    }
103  }
104
105  boolean isEmpty() {
106    return enqueuedProc == null;
107  }
108
109  int waitingSize() {
110    return queue.size();
111  }
112
113  @Override
114  public String toString() {
115    return "TableProcedureWaitingQueue [enqueuedProc=" + enqueuedProc + ", queue=" + queue + "]";
116  }
117}