001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.util.ArrayDeque; 021import java.util.Queue; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.Lock; 024import java.util.concurrent.locks.ReentrantLock; 025import org.apache.hadoop.hbase.client.RegionInfo; 026import org.apache.hadoop.hbase.procedure2.Procedure; 027import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; 028import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * A lock implementation which supports unlock by another thread. 033 * <p> 034 * This is because we need to hold region state node lock while updating region state to meta(for 035 * keeping consistency), so it is better to yield the procedure to release the procedure worker. But 036 * after waking up the procedure, we may use another procedure worker to execute the procedure, 037 * which means we need to unlock by another thread. 038 * <p> 039 * For locking by procedure, we will also suspend the procedure if the lock is not ready, and 040 * schedule it again when lock is ready. This is very important to not block the PEWorker as we may 041 * hold the lock when updating meta, which could take a lot of time. 042 * <p> 043 * Please see HBASE-28196 for more details. 044 */ 045@InterfaceAudience.Private 046class RegionStateNodeLock { 047 048 // for better logging message 049 private final RegionInfo regionInfo; 050 051 private final Lock lock = new ReentrantLock(); 052 053 private final Queue<QueueEntry> waitingQueue = new ArrayDeque<>(); 054 055 private Object owner; 056 057 private int count; 058 059 /** 060 * This is for abstraction the common lock/unlock logic for both Thread and Procedure. 061 */ 062 private interface QueueEntry { 063 064 /** 065 * A thread, or a procedure. 066 */ 067 Object getOwner(); 068 069 /** 070 * Called when we can not hold the lock and should wait. For thread, you should wait on a 071 * condition, and for procedure, a ProcedureSuspendedException should be thrown. 072 */ 073 void await() throws ProcedureSuspendedException; 074 075 /** 076 * Called when it is your turn to get the lock. For thread, just delegate the call to 077 * condition's signal method, and for procedure, you should call the {@code wakeUp} action, to 078 * add the procedure back to procedure scheduler. 079 */ 080 void signal(); 081 } 082 083 RegionStateNodeLock(RegionInfo regionInfo) { 084 this.regionInfo = regionInfo; 085 } 086 087 private void lock0(QueueEntry entry) throws ProcedureSuspendedException { 088 lock.lock(); 089 try { 090 for (;;) { 091 if (owner == null) { 092 owner = entry.getOwner(); 093 count = 1; 094 return; 095 } 096 if (owner == entry.getOwner()) { 097 count++; 098 return; 099 } 100 waitingQueue.add(entry); 101 entry.await(); 102 } 103 } finally { 104 lock.unlock(); 105 } 106 } 107 108 private boolean tryLock0(Object lockBy) { 109 if (!lock.tryLock()) { 110 return false; 111 } 112 try { 113 if (owner == null) { 114 owner = lockBy; 115 count = 1; 116 return true; 117 } 118 if (owner == lockBy) { 119 count++; 120 return true; 121 } 122 return false; 123 } finally { 124 lock.unlock(); 125 } 126 } 127 128 private void unlock0(Object unlockBy) { 129 lock.lock(); 130 try { 131 if (owner == null) { 132 throw new IllegalMonitorStateException("RegionStateNode " + regionInfo + " is not locked"); 133 } 134 if (owner != unlockBy) { 135 throw new IllegalMonitorStateException("RegionStateNode " + regionInfo + " is locked by " 136 + owner + ", can not be unlocked by " + unlockBy); 137 } 138 count--; 139 if (count == 0) { 140 owner = null; 141 QueueEntry entry = waitingQueue.poll(); 142 if (entry != null) { 143 entry.signal(); 144 } 145 } 146 } finally { 147 lock.unlock(); 148 } 149 } 150 151 /** 152 * Normal lock, will set the current thread as owner. Typically you should use try...finally to 153 * call unlock in the finally block. 154 */ 155 void lock() { 156 Thread currentThread = Thread.currentThread(); 157 try { 158 lock0(new QueueEntry() { 159 160 private Condition cond; 161 162 @Override 163 public void signal() { 164 cond.signal(); 165 } 166 167 @Override 168 public Object getOwner() { 169 return currentThread; 170 } 171 172 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_AWAIT_NOT_IN_LOOP", 173 justification = "Loop is in the caller method") 174 @Override 175 public void await() { 176 if (cond == null) { 177 cond = lock.newCondition(); 178 } 179 cond.awaitUninterruptibly(); 180 } 181 }); 182 } catch (ProcedureSuspendedException e) { 183 // should not happen 184 throw new AssertionError(e); 185 } 186 } 187 188 /** 189 * Normal tryLock, will set the current thread as owner. Typically you should use try...finally to 190 * call unlock in the finally block. 191 */ 192 boolean tryLock() { 193 return tryLock0(Thread.currentThread()); 194 } 195 196 /** 197 * Normal unLock, will use the current thread as owner. Typically you should use try...finally to 198 * call unlock in the finally block. 199 */ 200 void unlock() { 201 unlock0(Thread.currentThread()); 202 } 203 204 /** 205 * Lock by a procedure. You can release the lock in another thread. 206 * <p> 207 * When the procedure can not get the lock immediately, a ProcedureSuspendedException will be 208 * thrown to suspend the procedure. And when we want to wake up a procedure, we will call the 209 * {@code wakeUp} action. Usually in the {@code wakeUp} action you should add the procedure back 210 * to procedure scheduler. 211 */ 212 void lock(Procedure<?> proc, Runnable wakeUp) throws ProcedureSuspendedException { 213 lock0(new QueueEntry() { 214 215 @Override 216 public Object getOwner() { 217 return proc; 218 } 219 220 @Override 221 public void await() throws ProcedureSuspendedException { 222 ProcedureFutureUtil.suspend(proc); 223 } 224 225 @Override 226 public void signal() { 227 // Here we need to set the owner to the procedure directly. 228 // For thread, we just block inside the lock0 method, so after signal we will continue and 229 // get the lock 230 // For procedure, the waking up here is actually a reschedule of the procedure to 231 // ProcedureExecutor, so here we need to set the owner first, and it is the procedure's duty 232 // to make sure that it has already hold the lock so do not need to call lock again, usually 233 // this should be done by calling isLockedBy method below 234 assert owner == null; 235 assert count == 0; 236 owner = proc; 237 count = 1; 238 wakeUp.run(); 239 } 240 }); 241 } 242 243 /** 244 * TryLock by a procedure. You can release the lock in another thread. 245 */ 246 boolean tryLock(Procedure<?> proc) { 247 return tryLock0(proc); 248 } 249 250 /** 251 * Unlock by a procedure. You do not need to call this method in the same thread with lock. 252 */ 253 void unlock(Procedure<?> proc) { 254 unlock0(proc); 255 } 256 257 /** 258 * Check whether the lock is locked by someone. 259 */ 260 boolean isLocked() { 261 lock.lock(); 262 try { 263 return owner != null; 264 } finally { 265 lock.unlock(); 266 } 267 } 268 269 /** 270 * Check whether the lock is locked by the given {@code lockBy}. 271 */ 272 boolean isLockedBy(Object lockBy) { 273 lock.lock(); 274 try { 275 return owner == lockBy; 276 } finally { 277 lock.unlock(); 278 } 279 } 280}