1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.InterProcessLock;
32 import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
33 import org.apache.hadoop.hbase.InterProcessReadWriteLock;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42 import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
43 import org.apache.zookeeper.KeeperException;
44
45
46
47
48 @InterfaceAudience.Private
49 public abstract class TableLockManager {
50
51 private static final Log LOG = LogFactory.getLog(TableLockManager.class);
52
53
54 public static final String TABLE_LOCK_ENABLE =
55 "hbase.table.lock.enable";
56
57
58 private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
59
60
61 protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
62 "hbase.table.write.lock.timeout.ms";
63
64
65 protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
66 "hbase.table.read.lock.timeout.ms";
67
68 protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
69 600 * 1000;
70
71 protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
72 600 * 1000;
73
74 public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
75
76 public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
77 600 * 1000;
78
79
80
81
82 @InterfaceAudience.Private
83 public interface TableLock {
84
85
86
87
88
89
90 void acquire() throws IOException;
91
92
93
94
95
96 void release() throws IOException;
97 }
98
99
100
101
102
103
104
105 public abstract TableLock writeLock(TableName tableName, String purpose);
106
107
108
109
110
111
112
113 public abstract TableLock readLock(TableName tableName, String purpose);
114
115
116
117
118
119
120
121 public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
122
123
124
125
126
127
128
129
130 public abstract void reapAllExpiredLocks() throws IOException;
131
132
133
134
135
136
137
138
139
140
141 public abstract void reapWriteLocks() throws IOException;
142
143
144
145
146
147
148
149 public abstract void tableDeleted(TableName tableName)
150 throws IOException;
151
152
153
154
155 public static TableLockManager createTableLockManager(Configuration conf,
156 ZooKeeperWatcher zkWatcher, ServerName serverName) {
157
158 if (conf.getBoolean(TABLE_LOCK_ENABLE,
159 DEFAULT_TABLE_LOCK_ENABLE)) {
160 long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
161 DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
162 long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
163 DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
164 long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
165 DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
166
167 return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
168 }
169
170 return new NullTableLockManager();
171 }
172
173
174
175
176 @InterfaceAudience.Private
177 public static class NullTableLockManager extends TableLockManager {
178 static class NullTableLock implements TableLock {
179 @Override
180 public void acquire() throws IOException {
181 }
182 @Override
183 public void release() throws IOException {
184 }
185 }
186 @Override
187 public TableLock writeLock(TableName tableName, String purpose) {
188 return new NullTableLock();
189 }
190 @Override
191 public TableLock readLock(TableName tableName, String purpose) {
192 return new NullTableLock();
193 }
194 @Override
195 public void reapAllExpiredLocks() throws IOException {
196 }
197 @Override
198 public void reapWriteLocks() throws IOException {
199 }
200 @Override
201 public void tableDeleted(TableName tableName) throws IOException {
202 }
203 @Override
204 public void visitAllLocks(MetadataHandler handler) throws IOException {
205 }
206 }
207
208
209 public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
210 int pblen = ProtobufUtil.lengthOfPBMagic();
211 if (bytes == null || bytes.length < pblen) {
212 return null;
213 }
214 try {
215 ZooKeeperProtos.TableLock.Builder builder = ZooKeeperProtos.TableLock.newBuilder();
216 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
217 return builder.build();
218 } catch (IOException ex) {
219 LOG.warn("Exception in deserialization", ex);
220 }
221 return null;
222 }
223
224
225
226
227 @InterfaceAudience.Private
228 private static class ZKTableLockManager extends TableLockManager {
229
230 private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
231 @Override
232 public void handleMetadata(byte[] ownerMetadata) {
233 if (!LOG.isDebugEnabled()) {
234 return;
235 }
236 ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
237 if (data == null) {
238 return;
239 }
240 LOG.debug("Table is locked by " +
241 String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
242 "purpose=%s, isShared=%s, createTime=%s]",
243 data.getTableName().getNamespace().toStringUtf8(),
244 data.getTableName().getQualifier().toStringUtf8(),
245 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
246 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
247 }
248 };
249
250 private static class TableLockImpl implements TableLock {
251 long lockTimeoutMs;
252 TableName tableName;
253 InterProcessLock lock;
254 boolean isShared;
255 ZooKeeperWatcher zkWatcher;
256 ServerName serverName;
257 String purpose;
258
259 public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
260 ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
261 this.tableName = tableName;
262 this.zkWatcher = zkWatcher;
263 this.serverName = serverName;
264 this.lockTimeoutMs = lockTimeoutMs;
265 this.isShared = isShared;
266 this.purpose = purpose;
267 }
268
269 @Override
270 public void acquire() throws IOException {
271 if (LOG.isTraceEnabled()) {
272 LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
273 " lock on: " + tableName + " for:" + purpose);
274 }
275
276 lock = createTableLock();
277 try {
278 if (lockTimeoutMs == -1) {
279
280 lock.acquire();
281 } else {
282 if (!lock.tryAcquire(lockTimeoutMs)) {
283 throw new LockTimeoutException("Timed out acquiring " +
284 (isShared ? "read" : "write") + "lock for table:" + tableName +
285 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
286 }
287 }
288 } catch (InterruptedException e) {
289 LOG.warn("Interrupted acquiring a lock for " + tableName, e);
290 Thread.currentThread().interrupt();
291 throw new InterruptedIOException("Interrupted acquiring a lock");
292 }
293 if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
294 + " lock on " + tableName + " for " + purpose);
295 }
296
297 @Override
298 public void release() throws IOException {
299 if (LOG.isTraceEnabled()) {
300 LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
301 + " lock on " + tableName);
302 }
303 if (lock == null) {
304 throw new IllegalStateException("Table " + tableName +
305 " is not locked!");
306 }
307
308 try {
309 lock.release();
310 } catch (InterruptedException e) {
311 LOG.warn("Interrupted while releasing a lock for " + tableName);
312 throw new InterruptedIOException();
313 }
314 if (LOG.isTraceEnabled()) {
315 LOG.trace("Released table lock on " + tableName);
316 }
317 }
318
319 private InterProcessLock createTableLock() {
320 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
321 tableName.getNameAsString());
322
323 ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
324 .setTableName(ProtobufUtil.toProtoTableName(tableName))
325 .setLockOwner(ProtobufUtil.toServerName(serverName))
326 .setThreadId(Thread.currentThread().getId())
327 .setPurpose(purpose)
328 .setIsShared(isShared)
329 .setCreateTime(EnvironmentEdgeManager.currentTime()).build();
330 byte[] lockMetadata = toBytes(data);
331
332 InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
333 METADATA_HANDLER);
334 return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
335 }
336 }
337
338 private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
339 return ProtobufUtil.prependPBMagic(data.toByteArray());
340 }
341
342 private final ServerName serverName;
343 private final ZooKeeperWatcher zkWatcher;
344 private final long writeLockTimeoutMs;
345 private final long readLockTimeoutMs;
346 private final long lockExpireTimeoutMs;
347
348
349
350
351
352
353
354
355
356
357
358 public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
359 ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
360 this.zkWatcher = zkWatcher;
361 this.serverName = serverName;
362 this.writeLockTimeoutMs = writeLockTimeoutMs;
363 this.readLockTimeoutMs = readLockTimeoutMs;
364 this.lockExpireTimeoutMs = lockExpireTimeoutMs;
365 }
366
367 @Override
368 public TableLock writeLock(TableName tableName, String purpose) {
369 return new TableLockImpl(tableName, zkWatcher,
370 serverName, writeLockTimeoutMs, false, purpose);
371 }
372
373 public TableLock readLock(TableName tableName, String purpose) {
374 return new TableLockImpl(tableName, zkWatcher,
375 serverName, readLockTimeoutMs, true, purpose);
376 }
377
378 public void visitAllLocks(MetadataHandler handler) throws IOException {
379 for (String tableName : getTableNames()) {
380 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
381 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
382 zkWatcher, tableLockZNode, null);
383 lock.readLock(null).visitLocks(handler);
384 lock.writeLock(null).visitLocks(handler);
385 }
386 }
387
388 private List<String> getTableNames() throws IOException {
389
390 List<String> tableNames;
391 try {
392 tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
393 } catch (KeeperException e) {
394 LOG.error("Unexpected ZooKeeper error when listing children", e);
395 throw new IOException("Unexpected ZooKeeper exception", e);
396 }
397 return tableNames;
398 }
399
400 @Override
401 public void reapWriteLocks() throws IOException {
402
403 try {
404 for (String tableName : getTableNames()) {
405 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
406 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
407 zkWatcher, tableLockZNode, null);
408 lock.writeLock(null).reapAllLocks();
409 }
410 } catch (IOException ex) {
411 throw ex;
412 } catch (Exception ex) {
413 LOG.warn("Caught exception while reaping table write locks", ex);
414 }
415 }
416
417 @Override
418 public void reapAllExpiredLocks() throws IOException {
419
420 try {
421 for (String tableName : getTableNames()) {
422 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
423 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
424 zkWatcher, tableLockZNode, null);
425 lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
426 lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
427 }
428 } catch (IOException ex) {
429 throw ex;
430 } catch (Exception ex) {
431 throw new IOException(ex);
432 }
433 }
434
435 @Override
436 public void tableDeleted(TableName tableName) throws IOException {
437
438 String tableNameStr = tableName.getNameAsString();
439 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
440 try {
441 ZKUtil.deleteNode(zkWatcher, tableLockZNode);
442 } catch (KeeperException ex) {
443 if (ex.code() == KeeperException.Code.NOTEMPTY) {
444
445
446 LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
447 + tableLockZNode);
448 return;
449 }
450 throw new IOException(ex);
451 }
452 }
453 }
454 }