001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import org.apache.yetus.audience.InterfaceAudience; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 025 026/** 027 * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the 028 * resource to wait on, and a queue for suspended procedures. 029 */ 030@InterfaceAudience.Private 031public class ProcedureEvent<T> { 032 private static final Logger LOG = LoggerFactory.getLogger(ProcedureEvent.class); 033 034 private final T object; 035 private boolean ready = false; 036 private ProcedureDeque suspendedProcedures = new ProcedureDeque(); 037 038 public ProcedureEvent(final T object) { 039 this.object = object; 040 } 041 042 public synchronized boolean isReady() { 043 return ready; 044 } 045 046 /** 047 * @return true if event is not ready and adds procedure to suspended queue, else returns false. 048 */ 049 public synchronized boolean suspendIfNotReady(Procedure proc) { 050 if (!ready) { 051 suspendedProcedures.addLast(proc); 052 } 053 return !ready; 054 } 055 056 /** Mark the event as not ready. */ 057 public synchronized void suspend() { 058 ready = false; 059 if (LOG.isTraceEnabled()) { 060 LOG.trace("Suspend " + toString()); 061 } 062 } 063 064 /** 065 * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the 066 * event as ready. 067 * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized. 068 */ 069 public void wake(AbstractProcedureScheduler procedureScheduler) { 070 procedureScheduler.wakeEvents(new ProcedureEvent[]{this}); 071 } 072 073 /** 074 * Wakes up the suspended procedures only if the given {@code proc} is waiting on this event. 075 * <p/> 076 * Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use 077 * with caution as it will cause performance issue if there are lots of procedures waiting on the 078 * event. 079 */ 080 public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler, 081 Procedure<?> proc) { 082 if (suspendedProcedures.stream().anyMatch(p -> p.getProcId() == proc.getProcId())) { 083 wake(procedureScheduler); 084 return true; 085 } 086 return false; 087 } 088 089 /** 090 * Wakes up all the given events and puts the procedures waiting on them back into 091 * ProcedureScheduler queues. 092 */ 093 public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) { 094 scheduler.wakeEvents(events); 095 } 096 097 /** 098 * Only to be used by ProcedureScheduler implementations. 099 * Reason: To wake up multiple events, locking sequence is 100 * schedLock --> synchronized (event) 101 * To wake up an event, both schedLock() and synchronized(event) are required. 102 * The order is schedLock() --> synchronized(event) because when waking up multiple events 103 * simultaneously, we keep the scheduler locked until all procedures suspended on these events 104 * have been added back to the queue (Maybe it's not required? Evaluate!) 105 * To avoid deadlocks, we want to keep the locking order same even when waking up single event. 106 * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used 107 * when waking up multiple events. 108 * Access should remain package-private. 109 */ 110 synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { 111 if (ready && !suspendedProcedures.isEmpty()) { 112 LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size()); 113 } 114 ready = true; 115 if (LOG.isTraceEnabled()) { 116 LOG.trace("Unsuspend " + toString()); 117 } 118 // wakeProcedure adds to the front of queue, so we start from last in the 119 // waitQueue' queue, so that the procedure which was added first goes in the front for 120 // the scheduler queue. 121 procedureScheduler.addFront(suspendedProcedures.descendingIterator()); 122 suspendedProcedures.clear(); 123 } 124 125 /** 126 * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it 127 * here for tests. 128 */ 129 @VisibleForTesting 130 public ProcedureDeque getSuspendedProcedures() { 131 return suspendedProcedures; 132 } 133 134 @Override 135 public String toString() { 136 return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + 137 ", " + suspendedProcedures; 138 } 139}