001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.util.ArrayDeque; 021import java.util.Optional; 022import java.util.Queue; 023import java.util.function.Function; 024import org.apache.hadoop.hbase.procedure2.Procedure; 025import org.apache.yetus.audience.InterfaceAudience; 026 027import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 028 029/** 030 * To prevent multiple Create/Modify/Disable/Enable table procedures run at the same time, we will 031 * keep table procedure in this queue first before actually enqueuing it to 032 * MasterProcedureScheduler's tableQueue. See HBASE-28683 for more details 033 */ 034@InterfaceAudience.Private 035class TableProcedureWaitingQueue { 036 037 private final Function<Long, Procedure<?>> procedureRetriever; 038 039 // whether there is already a table procedure enqueued in ProcedureScheduler. 040 private Procedure<?> enqueuedProc; 041 042 private final Queue<Procedure<?>> queue = new ArrayDeque<>(); 043 044 TableProcedureWaitingQueue(Function<Long, Procedure<?>> procedureRetriever) { 045 this.procedureRetriever = procedureRetriever; 046 } 047 048 private boolean isSubProcedure(Procedure<?> proc) { 049 while (proc.hasParent()) { 050 if (proc.getParentProcId() == enqueuedProc.getProcId()) { 051 return true; 052 } 053 proc = Preconditions.checkNotNull(procedureRetriever.apply(proc.getParentProcId()), 054 "can not find parent procedure pid=%s", proc.getParentProcId()); 055 } 056 return false; 057 } 058 059 /** 060 * Return whether we can enqueue this procedure to ProcedureScheduler. 061 * <p> 062 * If returns {@code true}, you should enqueue this procedure, otherwise you just need to do 063 * nothing, as we will queue it in the waitingQueue, and you will finally get it again by calling 064 * {@link #procedureCompleted(Procedure)} method in the future. 065 */ 066 boolean procedureSubmitted(Procedure<?> proc) { 067 if (enqueuedProc == null) { 068 // no procedure enqueued yet, record it and return 069 enqueuedProc = proc; 070 return true; 071 } 072 if (proc == enqueuedProc) { 073 // the same procedure is enqueued again, this usually because the procedure comes back from 074 // WAITING state, such as all child procedures are finished 075 return true; 076 } 077 // check whether this is a sub procedure of the enqueued procedure 078 if (isSubProcedure(proc)) { 079 return true; 080 } 081 queue.add(proc); 082 return false; 083 } 084 085 /** 086 * Return the next procedure which can be enqueued to ProcedureScheduler. 087 */ 088 Optional<Procedure<?>> procedureCompleted(Procedure<?> proc) { 089 Preconditions.checkState(enqueuedProc != null, "enqueued procedure should not be null"); 090 if (enqueuedProc == proc) { 091 if (!queue.isEmpty()) { 092 enqueuedProc = queue.poll(); 093 return Optional.of(enqueuedProc); 094 } else { 095 enqueuedProc = null; 096 return Optional.empty(); 097 } 098 } else { 099 Preconditions.checkState(isSubProcedure(proc), 100 "procedure %s is not a sub procedure of enqueued procedure %s", proc, enqueuedProc); 101 return Optional.empty(); 102 } 103 } 104 105 boolean isEmpty() { 106 return enqueuedProc == null; 107 } 108 109 int waitingSize() { 110 return queue.size(); 111 } 112 113 @Override 114 public String toString() { 115 return "TableProcedureWaitingQueue [enqueuedProc=" + enqueuedProc + ", queue=" + queue + "]"; 116 } 117}