001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import org.apache.yetus.audience.InterfaceAudience; 021import org.slf4j.Logger; 022import org.slf4j.LoggerFactory; 023 024/** 025 * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the 026 * resource to wait on, and a queue for suspended procedures. 027 */ 028@InterfaceAudience.Private 029public class ProcedureEvent<T> { 030 private static final Logger LOG = LoggerFactory.getLogger(ProcedureEvent.class); 031 032 private final T object; 033 private boolean ready = false; 034 private ProcedureDeque suspendedProcedures = new ProcedureDeque(); 035 036 public ProcedureEvent(final T object) { 037 this.object = object; 038 } 039 040 public synchronized boolean isReady() { 041 return ready; 042 } 043 044 /** 045 * Returns true if event is not ready and adds procedure to suspended queue, else returns false. 046 */ 047 public synchronized boolean suspendIfNotReady(Procedure proc) { 048 if (!ready) { 049 suspendedProcedures.addLast(proc); 050 } 051 return !ready; 052 } 053 054 /** Mark the event as not ready. */ 055 public synchronized void suspend() { 056 ready = false; 057 if (LOG.isTraceEnabled()) { 058 LOG.trace("Suspend " + toString()); 059 } 060 } 061 062 /** 063 * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the event 064 * as ready. See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not 065 * synchronized. 066 */ 067 public void wake(AbstractProcedureScheduler procedureScheduler) { 068 procedureScheduler.wakeEvents(new ProcedureEvent[] { this }); 069 } 070 071 /** 072 * Wakes up the suspended procedures only if the given {@code proc} is waiting on this event. 073 * <p/> 074 * Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use 075 * with caution as it will cause performance issue if there are lots of procedures waiting on the 076 * event. 077 */ 078 public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler, 079 Procedure<?> proc) { 080 if (suspendedProcedures.stream().anyMatch(p -> p.getProcId() == proc.getProcId())) { 081 wake(procedureScheduler); 082 return true; 083 } 084 return false; 085 } 086 087 /** 088 * Wakes up all the given events and puts the procedures waiting on them back into 089 * ProcedureScheduler queues. 090 */ 091 public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent... events) { 092 scheduler.wakeEvents(events); 093 } 094 095 /** 096 * Only to be used by ProcedureScheduler implementations. Reason: To wake up multiple events, 097 * locking sequence is schedLock --> synchronized (event) To wake up an event, both schedLock() 098 * and synchronized(event) are required. The order is schedLock() --> synchronized(event) because 099 * when waking up multiple events simultaneously, we keep the scheduler locked until all 100 * procedures suspended on these events have been added back to the queue (Maybe it's not 101 * required? Evaluate!) To avoid deadlocks, we want to keep the locking order same even when 102 * waking up single event. That's why, {@link #wake(AbstractProcedureScheduler)} above uses the 103 * same code path as used when waking up multiple events. Access should remain package-private. 104 */ 105 public synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { 106 if (ready && !suspendedProcedures.isEmpty()) { 107 LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size()); 108 } 109 ready = true; 110 if (LOG.isTraceEnabled()) { 111 LOG.trace("Unsuspend " + toString()); 112 } 113 // wakeProcedure adds to the front of queue, so we start from last in the 114 // waitQueue' queue, so that the procedure which was added first goes in the front for 115 // the scheduler queue. 116 procedureScheduler.addFront(suspendedProcedures.descendingIterator()); 117 suspendedProcedures.clear(); 118 } 119 120 /** 121 * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it here 122 * for tests. 123 */ 124 public ProcedureDeque getSuspendedProcedures() { 125 return suspendedProcedures; 126 } 127 128 @Override 129 public synchronized String toString() { 130 return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + ", " 131 + suspendedProcedures; 132 } 133}