001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import org.apache.yetus.audience.InterfaceAudience; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025/** 026 * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the 027 * resource to wait on, and a queue for suspended procedures. 028 */ 029@InterfaceAudience.Private 030public class ProcedureEvent<T> { 031 private static final Logger LOG = LoggerFactory.getLogger(ProcedureEvent.class); 032 033 private final T object; 034 private boolean ready = false; 035 private ProcedureDeque suspendedProcedures = new ProcedureDeque(); 036 037 public ProcedureEvent(final T object) { 038 this.object = object; 039 } 040 041 public synchronized boolean isReady() { 042 return ready; 043 } 044 045 /** 046 * @return true if event is not ready and adds procedure to suspended queue, else returns false. 047 */ 048 public synchronized boolean suspendIfNotReady(Procedure proc) { 049 if (!ready) { 050 suspendedProcedures.addLast(proc); 051 } 052 return !ready; 053 } 054 055 /** Mark the event as not ready. */ 056 public synchronized void suspend() { 057 ready = false; 058 if (LOG.isTraceEnabled()) { 059 LOG.trace("Suspend " + toString()); 060 } 061 } 062 063 /** 064 * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the 065 * event as ready. 066 * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized. 067 */ 068 public void wake(AbstractProcedureScheduler procedureScheduler) { 069 procedureScheduler.wakeEvents(new ProcedureEvent[]{this}); 070 } 071 072 /** 073 * Wakes up the suspended procedures only if the given {@code proc} is waiting on this event. 074 * <p/> 075 * Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use 076 * with caution as it will cause performance issue if there are lots of procedures waiting on the 077 * event. 078 */ 079 public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler, 080 Procedure<?> proc) { 081 if (suspendedProcedures.stream().anyMatch(p -> p.getProcId() == proc.getProcId())) { 082 wake(procedureScheduler); 083 return true; 084 } 085 return false; 086 } 087 088 /** 089 * Wakes up all the given events and puts the procedures waiting on them back into 090 * ProcedureScheduler queues. 091 */ 092 public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) { 093 scheduler.wakeEvents(events); 094 } 095 096 /** 097 * Only to be used by ProcedureScheduler implementations. 098 * Reason: To wake up multiple events, locking sequence is 099 * schedLock --> synchronized (event) 100 * To wake up an event, both schedLock() and synchronized(event) are required. 101 * The order is schedLock() --> synchronized(event) because when waking up multiple events 102 * simultaneously, we keep the scheduler locked until all procedures suspended on these events 103 * have been added back to the queue (Maybe it's not required? Evaluate!) 104 * To avoid deadlocks, we want to keep the locking order same even when waking up single event. 105 * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used 106 * when waking up multiple events. 107 * Access should remain package-private. 108 */ 109 synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { 110 if (ready && !suspendedProcedures.isEmpty()) { 111 LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size()); 112 } 113 ready = true; 114 if (LOG.isTraceEnabled()) { 115 LOG.trace("Unsuspend " + toString()); 116 } 117 // wakeProcedure adds to the front of queue, so we start from last in the 118 // waitQueue' queue, so that the procedure which was added first goes in the front for 119 // the scheduler queue. 120 procedureScheduler.addFront(suspendedProcedures.descendingIterator()); 121 suspendedProcedures.clear(); 122 } 123 124 /** 125 * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it 126 * here for tests. 127 */ 128 public ProcedureDeque getSuspendedProcedures() { 129 return suspendedProcedures; 130 } 131 132 @Override 133 public String toString() { 134 return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + 135 ", " + suspendedProcedures; 136 } 137}