001/** 002 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master.locking; 021 022import java.io.IOException; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; 032import org.apache.hadoop.hbase.procedure2.LockType; 033import org.apache.hadoop.hbase.procedure2.Procedure; 034import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 035import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 044 045/** 046 * Procedure to allow blessed clients and external admin tools to take our internal Schema locks 047 * used by the procedure framework isolating procedures doing creates/deletes etc. on 048 * table/namespace/regions. 049 * This procedure when scheduled, acquires specified locks, suspends itself and waits for: 050 * <ul> 051 * <li>Call to unlock: if lock request came from the process itself, say master chore.</li> 052 * <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding 053 * the lock or not based on last heartbeat timestamp.</li> 054 * </ul> 055 */ 056@InterfaceAudience.Private 057public final class LockProcedure extends Procedure<MasterProcedureEnv> 058 implements TableProcedureInterface { 059 private static final Logger LOG = LoggerFactory.getLogger(LockProcedure.class); 060 061 public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000; // timeout in ms 062 public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF = 063 "hbase.master.procedure.remote.locks.timeout.ms"; 064 // 10 min. Same as old ZK lock timeout. 065 public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000; 066 public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF = 067 "hbase.master.procedure.local.master.locks.timeout.ms"; 068 069 private String namespace; 070 private TableName tableName; 071 private RegionInfo[] regionInfos; 072 private LockType type; 073 // underlying namespace/table/region lock. 074 private LockInterface lock; 075 private TableOperationType opType; 076 private String description; 077 // True when recovery of master lock from WALs 078 private boolean recoveredMasterLock; 079 080 private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this); 081 // True if this proc acquired relevant locks. This value is for client checks. 082 private final AtomicBoolean locked = new AtomicBoolean(false); 083 // Last system time (in ms) when client sent the heartbeat. 084 // Initialize to system time for non-null value in case of recovery. 085 private final AtomicLong lastHeartBeat = new AtomicLong(); 086 // Set to true when unlock request is received. 087 private final AtomicBoolean unlock = new AtomicBoolean(false); 088 // decreased when locks are acquired. Only used for local (with master process) purposes. 089 // Setting latch to non-null value increases default timeout to 090 // DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat. 091 private final CountDownLatch lockAcquireLatch; 092 093 private volatile boolean suspended = false; 094 095 @Override 096 public TableName getTableName() { 097 return tableName; 098 } 099 100 @Override 101 public TableOperationType getTableOperationType() { 102 return opType; 103 } 104 105 private interface LockInterface { 106 boolean acquireLock(MasterProcedureEnv env); 107 void releaseLock(MasterProcedureEnv env); 108 } 109 110 public LockProcedure() { 111 lockAcquireLatch = null; 112 } 113 114 private LockProcedure(final Configuration conf, final LockType type, 115 final String description, final CountDownLatch lockAcquireLatch) { 116 this.type = type; 117 this.description = description; 118 this.lockAcquireLatch = lockAcquireLatch; 119 if (lockAcquireLatch == null) { 120 setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS)); 121 } else { 122 setTimeout(conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, 123 DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS)); 124 } 125 } 126 127 /** 128 * Constructor for namespace lock. 129 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 130 */ 131 public LockProcedure(final Configuration conf, final String namespace, final LockType type, 132 final String description, final CountDownLatch lockAcquireLatch) 133 throws IllegalArgumentException { 134 this(conf, type, description, lockAcquireLatch); 135 136 if (namespace.isEmpty()) { 137 throw new IllegalArgumentException("Empty namespace"); 138 } 139 140 this.namespace = namespace; 141 this.lock = setupNamespaceLock(); 142 } 143 144 /** 145 * Constructor for table lock. 146 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 147 */ 148 public LockProcedure(final Configuration conf, final TableName tableName, final LockType type, 149 final String description, final CountDownLatch lockAcquireLatch) 150 throws IllegalArgumentException { 151 this(conf, type, description, lockAcquireLatch); 152 153 this.tableName = tableName; 154 this.lock = setupTableLock(); 155 } 156 157 /** 158 * Constructor for region lock(s). 159 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 160 * Useful for locks acquired locally from master process. 161 * @throws IllegalArgumentException if all regions are not from same table. 162 */ 163 public LockProcedure(final Configuration conf, final RegionInfo[] regionInfos, 164 final LockType type, final String description, final CountDownLatch lockAcquireLatch) 165 throws IllegalArgumentException { 166 this(conf, type, description, lockAcquireLatch); 167 168 // Build RegionInfo from region names. 169 if (regionInfos.length == 0) { 170 throw new IllegalArgumentException("No regions specified for region lock"); 171 } 172 173 // check all regions belong to same table. 174 final TableName regionTable = regionInfos[0].getTable(); 175 for (int i = 1; i < regionInfos.length; ++i) { 176 if (!regionInfos[i].getTable().equals(regionTable)) { 177 throw new IllegalArgumentException("All regions should be from same table"); 178 } 179 } 180 181 this.regionInfos = regionInfos; 182 this.lock = setupRegionLock(); 183 } 184 185 private boolean hasHeartbeatExpired() { 186 return System.currentTimeMillis() - lastHeartBeat.get() >= getTimeout(); 187 } 188 189 /** 190 * Updates timeout deadline for the lock. 191 */ 192 public void updateHeartBeat() { 193 lastHeartBeat.set(System.currentTimeMillis()); 194 if (LOG.isDebugEnabled()) { 195 LOG.debug("Heartbeat " + toString()); 196 } 197 } 198 199 /** 200 * Re run the procedure after every timeout to write new WAL entries so we don't hold back old 201 * WALs. 202 * @return false, so procedure framework doesn't mark this procedure as failure. 203 */ 204 @Override 205 protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) { 206 synchronized (event) { 207 if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event); 208 if (!event.isReady()) { // Maybe unlock() awakened the event. 209 setState(ProcedureProtos.ProcedureState.RUNNABLE); 210 if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event); 211 event.wake(env.getProcedureScheduler()); 212 } 213 } 214 return false; // false: do not mark the procedure as failed. 215 } 216 217 // Can be called before procedure gets scheduled, in which case, the execute() will finish 218 // immediately and release the underlying locks. 219 public void unlock(final MasterProcedureEnv env) { 220 unlock.set(true); 221 locked.set(false); 222 // Maybe timeout already awakened the event and the procedure has finished. 223 synchronized (event) { 224 if (!event.isReady() && suspended) { 225 setState(ProcedureProtos.ProcedureState.RUNNABLE); 226 event.wake(env.getProcedureScheduler()); 227 suspended = false; 228 } 229 } 230 } 231 232 @Override 233 protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env) 234 throws ProcedureSuspendedException { 235 // Local master locks don't store any state, so on recovery, simply finish this procedure 236 // immediately. 237 if (recoveredMasterLock) return null; 238 if (lockAcquireLatch != null) { 239 lockAcquireLatch.countDown(); 240 } 241 if (unlock.get() || hasHeartbeatExpired()) { 242 locked.set(false); 243 LOG.debug((unlock.get()? "UNLOCKED " : "TIMED OUT ") + toString()); 244 return null; 245 } 246 synchronized (event) { 247 event.suspend(); 248 event.suspendIfNotReady(this); 249 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 250 suspended = true; 251 } 252 throw new ProcedureSuspendedException(); 253 } 254 255 @Override 256 protected void rollback(final MasterProcedureEnv env) { 257 throw new UnsupportedOperationException(); 258 } 259 260 @Override 261 protected boolean abort(final MasterProcedureEnv env) { 262 unlock(env); 263 return true; 264 } 265 266 @Override 267 protected void serializeStateData(ProcedureStateSerializer serializer) 268 throws IOException { 269 final LockProcedureData.Builder builder = LockProcedureData.newBuilder() 270 .setLockType(LockServiceProtos.LockType.valueOf(type.name())) 271 .setDescription(description); 272 if (regionInfos != null) { 273 for (int i = 0; i < regionInfos.length; ++i) { 274 builder.addRegionInfo(ProtobufUtil.toRegionInfo(regionInfos[i])); 275 } 276 } else if (namespace != null) { 277 builder.setNamespace(namespace); 278 } else if (tableName != null) { 279 builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); 280 } 281 if (lockAcquireLatch != null) { 282 builder.setIsMasterLock(true); 283 } 284 serializer.serialize(builder.build()); 285 } 286 287 @Override 288 protected void deserializeStateData(ProcedureStateSerializer serializer) 289 throws IOException { 290 final LockProcedureData state = serializer.deserialize(LockProcedureData.class); 291 type = LockType.valueOf(state.getLockType().name()); 292 description = state.getDescription(); 293 if (state.getRegionInfoCount() > 0) { 294 regionInfos = new RegionInfo[state.getRegionInfoCount()]; 295 for (int i = 0; i < state.getRegionInfoCount(); ++i) { 296 regionInfos[i] = ProtobufUtil.toRegionInfo(state.getRegionInfo(i)); 297 } 298 } else if (state.hasNamespace()) { 299 namespace = state.getNamespace(); 300 } else if (state.hasTableName()) { 301 tableName = ProtobufUtil.toTableName(state.getTableName()); 302 } 303 recoveredMasterLock = state.getIsMasterLock(); 304 this.lock = setupLock(); 305 } 306 307 @Override 308 protected LockState acquireLock(final MasterProcedureEnv env) { 309 boolean ret = lock.acquireLock(env); 310 locked.set(ret); 311 if (ret) { 312 if (LOG.isDebugEnabled()) { 313 LOG.debug("LOCKED " + toString()); 314 } 315 lastHeartBeat.set(System.currentTimeMillis()); 316 return LockState.LOCK_ACQUIRED; 317 } 318 LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING"); 319 return LockState.LOCK_EVENT_WAIT; 320 } 321 322 @Override 323 protected void releaseLock(final MasterProcedureEnv env) { 324 lock.releaseLock(env); 325 } 326 327 /** 328 * On recovery, re-execute from start to acquire the locks. 329 * Need to explicitly set it to RUNNABLE because the procedure might have been in WAITING_TIMEOUT 330 * state when crash happened. In which case, it'll be sent back to timeout queue on recovery, 331 * which we don't want since we want to require locks. 332 */ 333 @Override 334 protected void beforeReplay(MasterProcedureEnv env) { 335 setState(ProcedureProtos.ProcedureState.RUNNABLE); 336 } 337 338 @Override 339 protected void toStringClassDetails(final StringBuilder builder) { 340 super.toStringClassDetails(builder); 341 if (regionInfos != null) { 342 builder.append(" regions="); 343 for (int i = 0; i < regionInfos.length; ++i) { 344 if (i > 0) builder.append(","); 345 builder.append(regionInfos[i].getShortNameToLog()); 346 } 347 } else if (namespace != null) { 348 builder.append(", namespace=").append(namespace); 349 } else if (tableName != null) { 350 builder.append(", tableName=").append(tableName); 351 } 352 builder.append(", type=").append(type); 353 } 354 355 public LockType getType() { 356 return type; 357 } 358 359 private LockInterface setupLock() throws IllegalArgumentException { 360 if (regionInfos != null) { 361 return setupRegionLock(); 362 } else if (namespace != null) { 363 return setupNamespaceLock(); 364 } else if (tableName != null) { 365 return setupTableLock(); 366 } else { 367 LOG.error("Unknown level specified in " + toString()); 368 throw new IllegalArgumentException("no namespace/table/region provided"); 369 } 370 } 371 372 private LockInterface setupNamespaceLock() throws IllegalArgumentException { 373 this.tableName = TableName.NAMESPACE_TABLE_NAME; 374 switch (type) { 375 case EXCLUSIVE: 376 this.opType = TableOperationType.EDIT; 377 return new NamespaceExclusiveLock(); 378 case SHARED: 379 LOG.error("Shared lock on namespace not supported for " + toString()); 380 throw new IllegalArgumentException("Shared lock on namespace not supported"); 381 default: 382 LOG.error("Unexpected lock type " + toString()); 383 throw new IllegalArgumentException("Wrong lock type: " + type.toString()); 384 } 385 } 386 387 private LockInterface setupTableLock() throws IllegalArgumentException { 388 switch (type) { 389 case EXCLUSIVE: 390 this.opType = TableOperationType.EDIT; 391 return new TableExclusiveLock(); 392 case SHARED: 393 this.opType = TableOperationType.READ; 394 return new TableSharedLock(); 395 default: 396 LOG.error("Unexpected lock type " + toString()); 397 throw new IllegalArgumentException("Wrong lock type:" + type.toString()); 398 } 399 } 400 401 private LockInterface setupRegionLock() throws IllegalArgumentException { 402 this.tableName = regionInfos[0].getTable(); 403 switch (type) { 404 case EXCLUSIVE: 405 this.opType = TableOperationType.REGION_EDIT; 406 return new RegionExclusiveLock(); 407 default: 408 LOG.error("Only exclusive lock supported on regions for " + toString()); 409 throw new IllegalArgumentException("Only exclusive lock supported on regions."); 410 } 411 } 412 413 public String getDescription() { 414 return description; 415 } 416 417 public boolean isLocked() { 418 return locked.get(); 419 } 420 421 @Override 422 public boolean holdLock(final MasterProcedureEnv env) { 423 return true; 424 } 425 426 /////////////////////// 427 // LOCK IMPLEMENTATIONS 428 /////////////////////// 429 430 private class TableExclusiveLock implements LockInterface { 431 @Override 432 public boolean acquireLock(final MasterProcedureEnv env) { 433 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 434 // to get the lock and false if you don't; i.e. you got the lock. 435 return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName); 436 } 437 438 @Override 439 public void releaseLock(final MasterProcedureEnv env) { 440 env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName); 441 } 442 } 443 444 private class TableSharedLock implements LockInterface { 445 @Override 446 public boolean acquireLock(final MasterProcedureEnv env) { 447 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 448 // to get the lock and false if you don't; i.e. you got the lock. 449 return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName); 450 } 451 452 @Override 453 public void releaseLock(final MasterProcedureEnv env) { 454 env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName); 455 } 456 } 457 458 private class NamespaceExclusiveLock implements LockInterface { 459 @Override 460 public boolean acquireLock(final MasterProcedureEnv env) { 461 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 462 // to get the lock and false if you don't; i.e. you got the lock. 463 return !env.getProcedureScheduler().waitNamespaceExclusiveLock( 464 LockProcedure.this, namespace); 465 } 466 467 @Override 468 public void releaseLock(final MasterProcedureEnv env) { 469 env.getProcedureScheduler().wakeNamespaceExclusiveLock( 470 LockProcedure.this, namespace); 471 } 472 } 473 474 private class RegionExclusiveLock implements LockInterface { 475 @Override 476 public boolean acquireLock(final MasterProcedureEnv env) { 477 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 478 // to get the lock and false if you don't; i.e. you got the lock. 479 return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos); 480 } 481 482 @Override 483 public void releaseLock(final MasterProcedureEnv env) { 484 env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos); 485 } 486 } 487}