001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.ConcurrentMap; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 030import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 031 032/** 033 * Allows multiple concurrent clients to lock on a numeric id with a minimal 034 * memory overhead. The intended usage is as follows: 035 * 036 * <pre> 037 * IdLock.Entry lockEntry = idLock.getLockEntry(id); 038 * try { 039 * // User code. 040 * } finally { 041 * idLock.releaseLockEntry(lockEntry); 042 * }</pre> 043 */ 044@InterfaceAudience.Private 045public class IdLock { 046 047 private static final Logger LOG = LoggerFactory.getLogger(IdLock.class); 048 049 /** An entry returned to the client as a lock object */ 050 public static final class Entry { 051 private final long id; 052 private int numWaiters; 053 private boolean locked = true; 054 private Thread holder; 055 056 private Entry(long id, Thread holder) { 057 this.id = id; 058 this.holder = holder; 059 } 060 061 @Override 062 public String toString() { 063 return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" 064 + locked + ", holder=" + holder; 065 } 066 } 067 068 private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>(); 069 070 /** 071 * Blocks until the lock corresponding to the given id is acquired. 072 * 073 * @param id an arbitrary number to lock on 074 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release 075 * the lock 076 * @throws IOException if interrupted 077 */ 078 public Entry getLockEntry(long id) throws IOException { 079 Thread currentThread = Thread.currentThread(); 080 Entry entry = new Entry(id, currentThread); 081 Entry existing; 082 while ((existing = map.putIfAbsent(entry.id, entry)) != null) { 083 synchronized (existing) { 084 if (existing.locked) { 085 ++existing.numWaiters; // Add ourselves to waiters. 086 while (existing.locked) { 087 try { 088 existing.wait(); 089 } catch (InterruptedException e) { 090 --existing.numWaiters; // Remove ourselves from waiters. 091 // HBASE-21292 092 // There is a rare case that interrupting and the lock owner thread call 093 // releaseLockEntry at the same time. Since the owner thread found there 094 // still one waiting, it won't remove the entry from the map. If the interrupted 095 // thread is the last one waiting on the lock, and since an exception is thrown, 096 // the 'existing' entry will stay in the map forever. Later threads which try to 097 // get this lock will stuck in a infinite loop because 098 // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false. 099 if (!existing.locked && existing.numWaiters == 0) { 100 map.remove(existing.id); 101 } 102 throw new InterruptedIOException( 103 "Interrupted waiting to acquire sparse lock"); 104 } 105 } 106 107 --existing.numWaiters; // Remove ourselves from waiters. 108 existing.locked = true; 109 existing.holder = currentThread; 110 return existing; 111 } 112 // If the entry is not locked, it might already be deleted from the 113 // map, so we cannot return it. We need to get our entry into the map 114 // or get someone else's locked entry. 115 } 116 } 117 return entry; 118 } 119 120 /** 121 * Blocks until the lock corresponding to the given id is acquired. 122 * 123 * @param id an arbitrary number to lock on 124 * @param time time to wait in ms 125 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release 126 * the lock 127 * @throws IOException if interrupted 128 */ 129 public Entry tryLockEntry(long id, long time) throws IOException { 130 Preconditions.checkArgument(time >= 0); 131 Thread currentThread = Thread.currentThread(); 132 Entry entry = new Entry(id, currentThread); 133 Entry existing; 134 long waitUtilTS = System.currentTimeMillis() + time; 135 long remaining = time; 136 while ((existing = map.putIfAbsent(entry.id, entry)) != null) { 137 synchronized (existing) { 138 if (existing.locked) { 139 ++existing.numWaiters; // Add ourselves to waiters. 140 try { 141 while (existing.locked) { 142 existing.wait(remaining); 143 if (existing.locked) { 144 long currentTS = System.currentTimeMillis(); 145 if (currentTS >= waitUtilTS) { 146 // time is up 147 return null; 148 } else { 149 // our wait is waken, but the lock is still taken, this can happen 150 // due to JDK Object's wait/notify mechanism. 151 // Calculate the new remaining time to wait 152 remaining = waitUtilTS - currentTS; 153 } 154 } 155 156 } 157 } catch (InterruptedException e) { 158 // HBASE-21292 159 // Please refer to the comments in getLockEntry() 160 // the difference here is that we decrease numWaiters in finally block 161 if (!existing.locked && existing.numWaiters == 1) { 162 map.remove(existing.id); 163 } 164 throw new InterruptedIOException( 165 "Interrupted waiting to acquire sparse lock"); 166 } finally { 167 --existing.numWaiters; // Remove ourselves from waiters. 168 } 169 existing.locked = true; 170 existing.holder = currentThread; 171 return existing; 172 } 173 // If the entry is not locked, it might already be deleted from the 174 // map, so we cannot return it. We need to get our entry into the map 175 // or get someone else's locked entry. 176 } 177 } 178 return entry; 179 } 180 181 /** 182 * Must be called in a finally block to decrease the internal counter and remove the monitor 183 * object for the given id if the caller is the last client. 184 * @param entry the return value of {@link #getLockEntry(long)} 185 */ 186 public void releaseLockEntry(Entry entry) { 187 Thread currentThread = Thread.currentThread(); 188 synchronized (entry) { 189 if (entry.holder != currentThread) { 190 LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread, 191 entry); 192 } 193 entry.locked = false; 194 if (entry.numWaiters > 0) { 195 entry.notify(); 196 } else { 197 map.remove(entry.id); 198 } 199 } 200 } 201 202 /** 203 * Test whether the given id is already locked by the current thread. 204 */ 205 public boolean isHeldByCurrentThread(long id) { 206 Thread currentThread = Thread.currentThread(); 207 Entry entry = map.get(id); 208 if (entry == null) { 209 return false; 210 } 211 synchronized (entry) { 212 return currentThread.equals(entry.holder); 213 } 214 } 215 216 @VisibleForTesting 217 void assertMapEmpty() { 218 assert map.isEmpty(); 219 } 220 221 @VisibleForTesting 222 public void waitForWaiters(long id, int numWaiters) throws InterruptedException { 223 for (Entry entry;;) { 224 entry = map.get(id); 225 if (entry != null) { 226 synchronized (entry) { 227 if (entry.numWaiters >= numWaiters) { 228 return; 229 } 230 } 231 } 232 Thread.sleep(100); 233 } 234 } 235}