001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import org.apache.yetus.audience.InterfaceAudience; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 025 026/** 027 * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the 028 * resource to wait on, and a queue for suspended procedures. 029 */ 030@InterfaceAudience.Private 031public class ProcedureEvent<T> { 032 private static final Logger LOG = LoggerFactory.getLogger(ProcedureEvent.class); 033 034 private final T object; 035 private boolean ready = false; 036 private ProcedureDeque suspendedProcedures = new ProcedureDeque(); 037 038 public ProcedureEvent(final T object) { 039 this.object = object; 040 } 041 042 public synchronized boolean isReady() { 043 return ready; 044 } 045 046 /** 047 * @return true if event is not ready and adds procedure to suspended queue, else returns false. 048 */ 049 public synchronized boolean suspendIfNotReady(Procedure proc) { 050 if (!ready) { 051 suspendedProcedures.addLast(proc); 052 } 053 return !ready; 054 } 055 056 /** Mark the event as not ready. */ 057 public synchronized void suspend() { 058 ready = false; 059 if (LOG.isTraceEnabled()) { 060 LOG.trace("Suspend " + toString()); 061 } 062 } 063 064 /** 065 * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the 066 * event as ready. 067 * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized. 068 */ 069 public void wake(AbstractProcedureScheduler procedureScheduler) { 070 procedureScheduler.wakeEvents(new ProcedureEvent[]{this}); 071 } 072 073 /** 074 * Wakes up all the given events and puts the procedures waiting on them back into 075 * ProcedureScheduler queues. 076 */ 077 public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) { 078 scheduler.wakeEvents(events); 079 } 080 081 /** 082 * Only to be used by ProcedureScheduler implementations. 083 * Reason: To wake up multiple events, locking sequence is 084 * schedLock --> synchronized (event) 085 * To wake up an event, both schedLock() and synchronized(event) are required. 086 * The order is schedLock() --> synchronized(event) because when waking up multiple events 087 * simultaneously, we keep the scheduler locked until all procedures suspended on these events 088 * have been added back to the queue (Maybe it's not required? Evaluate!) 089 * To avoid deadlocks, we want to keep the locking order same even when waking up single event. 090 * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used 091 * when waking up multiple events. 092 * Access should remain package-private. 093 */ 094 synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { 095 if (ready && !suspendedProcedures.isEmpty()) { 096 LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size()); 097 } 098 ready = true; 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("Unsuspend " + toString()); 101 } 102 // wakeProcedure adds to the front of queue, so we start from last in the 103 // waitQueue' queue, so that the procedure which was added first goes in the front for 104 // the scheduler queue. 105 procedureScheduler.addFront(suspendedProcedures.descendingIterator()); 106 suspendedProcedures.clear(); 107 } 108 109 /** 110 * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it 111 * here for tests. 112 */ 113 @VisibleForTesting 114 public ProcedureDeque getSuspendedProcedures() { 115 return suspendedProcedures; 116 } 117 118 @Override 119 public String toString() { 120 return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + 121 ", " + suspendedProcedures; 122 } 123}