1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.InterProcessLock;
30 import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
31 import org.apache.hadoop.hbase.InterProcessReadWriteLock;
32 import org.apache.hadoop.hbase.ServerName;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
42 import org.apache.zookeeper.KeeperException;
43
44
45
46
47 @InterfaceAudience.Private
48 public abstract class TableLockManager {
49
50 private static final Log LOG = LogFactory.getLog(TableLockManager.class);
51
52
53 public static final String TABLE_LOCK_ENABLE =
54 "hbase.table.lock.enable";
55
56
57 private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
58
59
60 protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
61 "hbase.table.write.lock.timeout.ms";
62
63
64 protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
65 "hbase.table.read.lock.timeout.ms";
66
67 protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
68 600 * 1000;
69
70 protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
71 600 * 1000;
72
73 public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
74
75 public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
76 600 * 1000;
77
78
79
80
81 @InterfaceAudience.Private
82 public interface TableLock {
83
84
85
86
87
88
89 void acquire() throws IOException;
90
91
92
93
94
95 void release() throws IOException;
96 }
97
98
99
100
101
102
103
104 public abstract TableLock writeLock(TableName tableName, String purpose);
105
106
107
108
109
110
111
112 public abstract TableLock readLock(TableName tableName, String purpose);
113
114
115
116
117
118
119
120 public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
121
122
123
124
125
126
127
128
129 public abstract void reapAllExpiredLocks() throws IOException;
130
131
132
133
134
135
136
137
138
139
140 public abstract void reapWriteLocks() throws IOException;
141
142
143
144
145
146
147
148 public abstract void tableDeleted(TableName tableName)
149 throws IOException;
150
151
152
153
154 public static TableLockManager createTableLockManager(Configuration conf,
155 ZooKeeperWatcher zkWatcher, ServerName serverName) {
156
157 if (conf.getBoolean(TABLE_LOCK_ENABLE,
158 DEFAULT_TABLE_LOCK_ENABLE)) {
159 long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
160 DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
161 long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
162 DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
163 long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
164 DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
165
166 return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
167 }
168
169 return new NullTableLockManager();
170 }
171
172
173
174
175 @InterfaceAudience.Private
176 public static class NullTableLockManager extends TableLockManager {
177 static class NullTableLock implements TableLock {
178 @Override
179 public void acquire() throws IOException {
180 }
181 @Override
182 public void release() throws IOException {
183 }
184 }
185 @Override
186 public TableLock writeLock(TableName tableName, String purpose) {
187 return new NullTableLock();
188 }
189 @Override
190 public TableLock readLock(TableName tableName, String purpose) {
191 return new NullTableLock();
192 }
193 @Override
194 public void reapAllExpiredLocks() throws IOException {
195 }
196 @Override
197 public void reapWriteLocks() throws IOException {
198 }
199 @Override
200 public void tableDeleted(TableName tableName) throws IOException {
201 }
202 @Override
203 public void visitAllLocks(MetadataHandler handler) throws IOException {
204 }
205 }
206
207
208 public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
209 int pblen = ProtobufUtil.lengthOfPBMagic();
210 if (bytes == null || bytes.length < pblen) {
211 return null;
212 }
213 try {
214 ZooKeeperProtos.TableLock.Builder builder = ZooKeeperProtos.TableLock.newBuilder();
215 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
216 return builder.build();
217 } catch (IOException ex) {
218 LOG.warn("Exception in deserialization", ex);
219 }
220 return null;
221 }
222
223
224
225
226 @InterfaceAudience.Private
227 private static class ZKTableLockManager extends TableLockManager {
228
229 private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
230 @Override
231 public void handleMetadata(byte[] ownerMetadata) {
232 if (!LOG.isDebugEnabled()) {
233 return;
234 }
235 ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
236 if (data == null) {
237 return;
238 }
239 LOG.debug("Table is locked by " +
240 String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
241 "purpose=%s, isShared=%s, createTime=%s]",
242 data.getTableName().getNamespace().toStringUtf8(),
243 data.getTableName().getQualifier().toStringUtf8(),
244 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
245 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
246 }
247 };
248
249 private static class TableLockImpl implements TableLock {
250 long lockTimeoutMs;
251 TableName tableName;
252 InterProcessLock lock;
253 boolean isShared;
254 ZooKeeperWatcher zkWatcher;
255 ServerName serverName;
256 String purpose;
257
258 public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
259 ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
260 this.tableName = tableName;
261 this.zkWatcher = zkWatcher;
262 this.serverName = serverName;
263 this.lockTimeoutMs = lockTimeoutMs;
264 this.isShared = isShared;
265 this.purpose = purpose;
266 }
267
268 @Override
269 public void acquire() throws IOException {
270 if (LOG.isTraceEnabled()) {
271 LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
272 " lock on: " + tableName + " for:" + purpose);
273 }
274
275 lock = createTableLock();
276 try {
277 if (lockTimeoutMs == -1) {
278
279 lock.acquire();
280 } else {
281 if (!lock.tryAcquire(lockTimeoutMs)) {
282 throw new LockTimeoutException("Timed out acquiring " +
283 (isShared ? "read" : "write") + "lock for table:" + tableName +
284 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
285 }
286 }
287 } catch (InterruptedException e) {
288 LOG.warn("Interrupted acquiring a lock for " + tableName, e);
289 Thread.currentThread().interrupt();
290 throw new InterruptedIOException("Interrupted acquiring a lock");
291 }
292 if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
293 + " lock on " + tableName + " for " + purpose);
294 }
295
296 @Override
297 public void release() throws IOException {
298 if (LOG.isTraceEnabled()) {
299 LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
300 + " lock on " + tableName);
301 }
302 if (lock == null) {
303 throw new IllegalStateException("Table " + tableName +
304 " is not locked!");
305 }
306
307 try {
308 lock.release();
309 } catch (InterruptedException e) {
310 LOG.warn("Interrupted while releasing a lock for " + tableName);
311 throw new InterruptedIOException();
312 }
313 if (LOG.isTraceEnabled()) {
314 LOG.trace("Released table lock on " + tableName);
315 }
316 }
317
318 private InterProcessLock createTableLock() {
319 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
320 tableName.getNameAsString());
321
322 ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
323 .setTableName(ProtobufUtil.toProtoTableName(tableName))
324 .setLockOwner(ProtobufUtil.toServerName(serverName))
325 .setThreadId(Thread.currentThread().getId())
326 .setPurpose(purpose)
327 .setIsShared(isShared)
328 .setCreateTime(EnvironmentEdgeManager.currentTime()).build();
329 byte[] lockMetadata = toBytes(data);
330
331 InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
332 METADATA_HANDLER);
333 return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
334 }
335 }
336
337 private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
338 return ProtobufUtil.prependPBMagic(data.toByteArray());
339 }
340
341 private final ServerName serverName;
342 private final ZooKeeperWatcher zkWatcher;
343 private final long writeLockTimeoutMs;
344 private final long readLockTimeoutMs;
345 private final long lockExpireTimeoutMs;
346
347
348
349
350
351
352
353
354
355
356
357 public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
358 ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
359 this.zkWatcher = zkWatcher;
360 this.serverName = serverName;
361 this.writeLockTimeoutMs = writeLockTimeoutMs;
362 this.readLockTimeoutMs = readLockTimeoutMs;
363 this.lockExpireTimeoutMs = lockExpireTimeoutMs;
364 }
365
366 @Override
367 public TableLock writeLock(TableName tableName, String purpose) {
368 return new TableLockImpl(tableName, zkWatcher,
369 serverName, writeLockTimeoutMs, false, purpose);
370 }
371
372 public TableLock readLock(TableName tableName, String purpose) {
373 return new TableLockImpl(tableName, zkWatcher,
374 serverName, readLockTimeoutMs, true, purpose);
375 }
376
377 public void visitAllLocks(MetadataHandler handler) throws IOException {
378 for (String tableName : getTableNames()) {
379 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
380 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
381 zkWatcher, tableLockZNode, null);
382 lock.readLock(null).visitLocks(handler);
383 lock.writeLock(null).visitLocks(handler);
384 }
385 }
386
387 private List<String> getTableNames() throws IOException {
388
389 List<String> tableNames;
390 try {
391 tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
392 } catch (KeeperException e) {
393 LOG.error("Unexpected ZooKeeper error when listing children", e);
394 throw new IOException("Unexpected ZooKeeper exception", e);
395 }
396 return tableNames;
397 }
398
399 @Override
400 public void reapWriteLocks() throws IOException {
401
402 try {
403 for (String tableName : getTableNames()) {
404 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
405 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
406 zkWatcher, tableLockZNode, null);
407 lock.writeLock(null).reapAllLocks();
408 }
409 } catch (IOException ex) {
410 throw ex;
411 } catch (Exception ex) {
412 LOG.warn("Caught exception while reaping table write locks", ex);
413 }
414 }
415
416 @Override
417 public void reapAllExpiredLocks() throws IOException {
418
419 try {
420 for (String tableName : getTableNames()) {
421 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
422 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
423 zkWatcher, tableLockZNode, null);
424 lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
425 lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
426 }
427 } catch (IOException ex) {
428 throw ex;
429 } catch (Exception ex) {
430 throw new IOException(ex);
431 }
432 }
433
434 @Override
435 public void tableDeleted(TableName tableName) throws IOException {
436
437 String tableNameStr = tableName.getNameAsString();
438 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
439 try {
440 ZKUtil.deleteNode(zkWatcher, tableLockZNode);
441 } catch (KeeperException ex) {
442 if (ex.code() == KeeperException.Code.NOTEMPTY) {
443
444
445 LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
446 + tableLockZNode);
447 return;
448 }
449 throw new IOException(ex);
450 }
451 }
452 }
453 }