001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.ConcurrentMap; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 029 030/** 031 * Allows multiple concurrent clients to lock on a numeric id with a minimal 032 * memory overhead. The intended usage is as follows: 033 * 034 * <pre> 035 * IdLock.Entry lockEntry = idLock.getLockEntry(id); 036 * try { 037 * // User code. 038 * } finally { 039 * idLock.releaseLockEntry(lockEntry); 040 * }</pre> 041 */ 042@InterfaceAudience.Private 043public class IdLock { 044 045 private static final Logger LOG = LoggerFactory.getLogger(IdLock.class); 046 047 /** An entry returned to the client as a lock object */ 048 public static final class Entry { 049 private final long id; 050 private int numWaiters; 051 private boolean locked = true; 052 private Thread holder; 053 054 private Entry(long id, Thread holder) { 055 this.id = id; 056 this.holder = holder; 057 } 058 059 @Override 060 public String toString() { 061 return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" 062 + locked + ", holder=" + holder; 063 } 064 } 065 066 private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>(); 067 068 /** 069 * Blocks until the lock corresponding to the given id is acquired. 070 * 071 * @param id an arbitrary number to lock on 072 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release 073 * the lock 074 * @throws IOException if interrupted 075 */ 076 public Entry getLockEntry(long id) throws IOException { 077 Thread currentThread = Thread.currentThread(); 078 Entry entry = new Entry(id, currentThread); 079 Entry existing; 080 while ((existing = map.putIfAbsent(entry.id, entry)) != null) { 081 synchronized (existing) { 082 if (existing.locked) { 083 ++existing.numWaiters; // Add ourselves to waiters. 084 while (existing.locked) { 085 try { 086 existing.wait(); 087 } catch (InterruptedException e) { 088 --existing.numWaiters; // Remove ourselves from waiters. 089 // HBASE-21292 090 // There is a rare case that interrupting and the lock owner thread call 091 // releaseLockEntry at the same time. Since the owner thread found there 092 // still one waiting, it won't remove the entry from the map. If the interrupted 093 // thread is the last one waiting on the lock, and since an exception is thrown, 094 // the 'existing' entry will stay in the map forever. Later threads which try to 095 // get this lock will stuck in a infinite loop because 096 // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false. 097 if (!existing.locked && existing.numWaiters == 0) { 098 map.remove(existing.id); 099 } 100 throw new InterruptedIOException( 101 "Interrupted waiting to acquire sparse lock"); 102 } 103 } 104 105 --existing.numWaiters; // Remove ourselves from waiters. 106 existing.locked = true; 107 existing.holder = currentThread; 108 return existing; 109 } 110 // If the entry is not locked, it might already be deleted from the 111 // map, so we cannot return it. We need to get our entry into the map 112 // or get someone else's locked entry. 113 } 114 } 115 return entry; 116 } 117 118 /** 119 * Blocks until the lock corresponding to the given id is acquired. 120 * 121 * @param id an arbitrary number to lock on 122 * @param time time to wait in ms 123 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release 124 * the lock 125 * @throws IOException if interrupted 126 */ 127 public Entry tryLockEntry(long id, long time) throws IOException { 128 Preconditions.checkArgument(time >= 0); 129 Thread currentThread = Thread.currentThread(); 130 Entry entry = new Entry(id, currentThread); 131 Entry existing; 132 long waitUtilTS = System.currentTimeMillis() + time; 133 long remaining = time; 134 while ((existing = map.putIfAbsent(entry.id, entry)) != null) { 135 synchronized (existing) { 136 if (existing.locked) { 137 ++existing.numWaiters; // Add ourselves to waiters. 138 try { 139 while (existing.locked) { 140 existing.wait(remaining); 141 if (existing.locked) { 142 long currentTS = System.currentTimeMillis(); 143 if (currentTS >= waitUtilTS) { 144 // time is up 145 return null; 146 } else { 147 // our wait is waken, but the lock is still taken, this can happen 148 // due to JDK Object's wait/notify mechanism. 149 // Calculate the new remaining time to wait 150 remaining = waitUtilTS - currentTS; 151 } 152 } 153 154 } 155 } catch (InterruptedException e) { 156 // HBASE-21292 157 // Please refer to the comments in getLockEntry() 158 // the difference here is that we decrease numWaiters in finally block 159 if (!existing.locked && existing.numWaiters == 1) { 160 map.remove(existing.id); 161 } 162 throw new InterruptedIOException( 163 "Interrupted waiting to acquire sparse lock"); 164 } finally { 165 --existing.numWaiters; // Remove ourselves from waiters. 166 } 167 existing.locked = true; 168 existing.holder = currentThread; 169 return existing; 170 } 171 // If the entry is not locked, it might already be deleted from the 172 // map, so we cannot return it. We need to get our entry into the map 173 // or get someone else's locked entry. 174 } 175 } 176 return entry; 177 } 178 179 /** 180 * Must be called in a finally block to decrease the internal counter and remove the monitor 181 * object for the given id if the caller is the last client. 182 * @param entry the return value of {@link #getLockEntry(long)} 183 */ 184 public void releaseLockEntry(Entry entry) { 185 Thread currentThread = Thread.currentThread(); 186 synchronized (entry) { 187 if (entry.holder != currentThread) { 188 LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread, 189 entry); 190 } 191 entry.locked = false; 192 if (entry.numWaiters > 0) { 193 entry.notify(); 194 } else { 195 map.remove(entry.id); 196 } 197 } 198 } 199 200 /** 201 * Test whether the given id is already locked by the current thread. 202 */ 203 public boolean isHeldByCurrentThread(long id) { 204 Thread currentThread = Thread.currentThread(); 205 Entry entry = map.get(id); 206 if (entry == null) { 207 return false; 208 } 209 synchronized (entry) { 210 return currentThread.equals(entry.holder); 211 } 212 } 213 214 void assertMapEmpty() { 215 assert map.isEmpty(); 216 } 217 218 public void waitForWaiters(long id, int numWaiters) throws InterruptedException { 219 for (Entry entry;;) { 220 entry = map.get(id); 221 if (entry != null) { 222 synchronized (entry) { 223 if (entry.numWaiters >= numWaiters) { 224 return; 225 } 226 } 227 } 228 Thread.sleep(100); 229 } 230 } 231}