001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.ConcurrentMap; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 029 030/** 031 * Allows multiple concurrent clients to lock on a numeric id with a minimal memory overhead. The 032 * intended usage is as follows: 033 * 034 * <pre> 035 * IdLock.Entry lockEntry = idLock.getLockEntry(id); 036 * try { 037 * // User code. 038 * } finally { 039 * idLock.releaseLockEntry(lockEntry); 040 * } 041 * </pre> 042 */ 043@InterfaceAudience.Private 044public class IdLock { 045 046 private static final Logger LOG = LoggerFactory.getLogger(IdLock.class); 047 048 /** An entry returned to the client as a lock object */ 049 public static final class Entry { 050 private final long id; 051 private int numWaiters; 052 private boolean locked = true; 053 private Thread holder; 054 055 private Entry(long id, Thread holder) { 056 this.id = id; 057 this.holder = holder; 058 } 059 060 @Override 061 public String toString() { 062 return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" + locked + ", holder=" 063 + holder; 064 } 065 } 066 067 private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>(); 068 069 /** 070 * Blocks until the lock corresponding to the given id is acquired. 071 * @param id an arbitrary number to lock on 072 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release the lock 073 * @throws IOException if interrupted 074 */ 075 public Entry getLockEntry(long id) throws IOException { 076 Thread currentThread = Thread.currentThread(); 077 Entry entry = new Entry(id, currentThread); 078 Entry existing; 079 while ((existing = map.putIfAbsent(entry.id, entry)) != null) { 080 synchronized (existing) { 081 if (existing.locked) { 082 ++existing.numWaiters; // Add ourselves to waiters. 083 while (existing.locked) { 084 try { 085 existing.wait(); 086 } catch (InterruptedException e) { 087 --existing.numWaiters; // Remove ourselves from waiters. 088 // HBASE-21292 089 // There is a rare case that interrupting and the lock owner thread call 090 // releaseLockEntry at the same time. Since the owner thread found there 091 // still one waiting, it won't remove the entry from the map. If the interrupted 092 // thread is the last one waiting on the lock, and since an exception is thrown, 093 // the 'existing' entry will stay in the map forever. Later threads which try to 094 // get this lock will stuck in a infinite loop because 095 // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false. 096 if (!existing.locked && existing.numWaiters == 0) { 097 map.remove(existing.id); 098 } 099 throw new InterruptedIOException("Interrupted waiting to acquire sparse lock"); 100 } 101 } 102 103 --existing.numWaiters; // Remove ourselves from waiters. 104 existing.locked = true; 105 existing.holder = currentThread; 106 return existing; 107 } 108 // If the entry is not locked, it might already be deleted from the 109 // map, so we cannot return it. We need to get our entry into the map 110 // or get someone else's locked entry. 111 } 112 } 113 return entry; 114 } 115 116 /** 117 * Blocks until the lock corresponding to the given id is acquired. 118 * @param id an arbitrary number to lock on 119 * @param time time to wait in ms 120 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release the lock 121 * @throws IOException if interrupted 122 */ 123 public Entry tryLockEntry(long id, long time) throws IOException { 124 Preconditions.checkArgument(time >= 0); 125 Thread currentThread = Thread.currentThread(); 126 Entry entry = new Entry(id, currentThread); 127 Entry existing; 128 long waitUtilTS = EnvironmentEdgeManager.currentTime() + time; 129 long remaining = time; 130 while ((existing = map.putIfAbsent(entry.id, entry)) != null) { 131 synchronized (existing) { 132 if (existing.locked) { 133 ++existing.numWaiters; // Add ourselves to waiters. 134 try { 135 while (existing.locked) { 136 existing.wait(remaining); 137 if (existing.locked) { 138 long currentTS = EnvironmentEdgeManager.currentTime(); 139 if (currentTS >= waitUtilTS) { 140 // time is up 141 return null; 142 } else { 143 // our wait is waken, but the lock is still taken, this can happen 144 // due to JDK Object's wait/notify mechanism. 145 // Calculate the new remaining time to wait 146 remaining = waitUtilTS - currentTS; 147 } 148 } 149 150 } 151 } catch (InterruptedException e) { 152 // HBASE-21292 153 // Please refer to the comments in getLockEntry() 154 // the difference here is that we decrease numWaiters in finally block 155 if (!existing.locked && existing.numWaiters == 1) { 156 map.remove(existing.id); 157 } 158 throw new InterruptedIOException("Interrupted waiting to acquire sparse lock"); 159 } finally { 160 --existing.numWaiters; // Remove ourselves from waiters. 161 } 162 existing.locked = true; 163 existing.holder = currentThread; 164 return existing; 165 } 166 // If the entry is not locked, it might already be deleted from the 167 // map, so we cannot return it. We need to get our entry into the map 168 // or get someone else's locked entry. 169 } 170 } 171 return entry; 172 } 173 174 /** 175 * Must be called in a finally block to decrease the internal counter and remove the monitor 176 * object for the given id if the caller is the last client. 177 * @param entry the return value of {@link #getLockEntry(long)} 178 */ 179 public void releaseLockEntry(Entry entry) { 180 Thread currentThread = Thread.currentThread(); 181 synchronized (entry) { 182 if (entry.holder != currentThread) { 183 LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread, 184 entry); 185 } 186 entry.locked = false; 187 if (entry.numWaiters > 0) { 188 entry.notify(); 189 } else { 190 map.remove(entry.id); 191 } 192 } 193 } 194 195 /** 196 * Test whether the given id is already locked by the current thread. 197 */ 198 public boolean isHeldByCurrentThread(long id) { 199 Thread currentThread = Thread.currentThread(); 200 Entry entry = map.get(id); 201 if (entry == null) { 202 return false; 203 } 204 synchronized (entry) { 205 return currentThread.equals(entry.holder); 206 } 207 } 208 209 void assertMapEmpty() { 210 assert map.isEmpty(); 211 } 212 213 public void waitForWaiters(long id, int numWaiters) throws InterruptedException { 214 for (Entry entry;;) { 215 entry = map.get(id); 216 if (entry != null) { 217 synchronized (entry) { 218 if (entry.numWaiters >= numWaiters) { 219 return; 220 } 221 } 222 } 223 Thread.sleep(100); 224 } 225 } 226}