001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.locking; 019 020import java.io.IOException; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.RegionInfo; 027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 028import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; 029import org.apache.hadoop.hbase.procedure2.LockType; 030import org.apache.hadoop.hbase.procedure2.Procedure; 031import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 043 044/** 045 * Procedure to allow blessed clients and external admin tools to take our internal Schema locks 046 * used by the procedure framework isolating procedures doing creates/deletes etc. on 047 * table/namespace/regions. This procedure when scheduled, acquires specified locks, suspends itself 048 * and waits for: 049 * <ul> 050 * <li>Call to unlock: if lock request came from the process itself, say master chore.</li> 051 * <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding 052 * the lock or not based on last heartbeat timestamp.</li> 053 * </ul> 054 */ 055@InterfaceAudience.Private 056public final class LockProcedure extends Procedure<MasterProcedureEnv> 057 implements TableProcedureInterface { 058 private static final Logger LOG = LoggerFactory.getLogger(LockProcedure.class); 059 060 public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000; // timeout in ms 061 public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF = 062 "hbase.master.procedure.remote.locks.timeout.ms"; 063 // 10 min. Same as old ZK lock timeout. 064 public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000; 065 public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF = 066 "hbase.master.procedure.local.master.locks.timeout.ms"; 067 068 private String namespace; 069 private TableName tableName; 070 private RegionInfo[] regionInfos; 071 private LockType type; 072 // underlying namespace/table/region lock. 073 private LockInterface lock; 074 private TableOperationType opType; 075 private String description; 076 // True when recovery of master lock from WALs 077 private boolean recoveredMasterLock; 078 079 private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this); 080 // True if this proc acquired relevant locks. This value is for client checks. 081 private final AtomicBoolean locked = new AtomicBoolean(false); 082 // Last system time (in ms) when client sent the heartbeat. 083 // Initialize to system time for non-null value in case of recovery. 084 private final AtomicLong lastHeartBeat = new AtomicLong(); 085 // Set to true when unlock request is received. 086 private final AtomicBoolean unlock = new AtomicBoolean(false); 087 // decreased when locks are acquired. Only used for local (with master process) purposes. 088 // Setting latch to non-null value increases default timeout to 089 // DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat. 090 private final CountDownLatch lockAcquireLatch; 091 092 private volatile boolean suspended = false; 093 094 @Override 095 public TableName getTableName() { 096 return tableName; 097 } 098 099 @Override 100 public TableOperationType getTableOperationType() { 101 return opType; 102 } 103 104 private interface LockInterface { 105 boolean acquireLock(MasterProcedureEnv env); 106 107 void releaseLock(MasterProcedureEnv env); 108 } 109 110 public LockProcedure() { 111 lockAcquireLatch = null; 112 } 113 114 private LockProcedure(final Configuration conf, final LockType type, final String description, 115 final CountDownLatch lockAcquireLatch) { 116 this.type = type; 117 this.description = description; 118 this.lockAcquireLatch = lockAcquireLatch; 119 if (lockAcquireLatch == null) { 120 setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS)); 121 } else { 122 setTimeout( 123 conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS)); 124 } 125 } 126 127 /** 128 * Constructor for namespace lock. 129 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 130 */ 131 public LockProcedure(final Configuration conf, final String namespace, final LockType type, 132 final String description, final CountDownLatch lockAcquireLatch) 133 throws IllegalArgumentException { 134 this(conf, type, description, lockAcquireLatch); 135 136 if (namespace.isEmpty()) { 137 throw new IllegalArgumentException("Empty namespace"); 138 } 139 140 this.namespace = namespace; 141 this.lock = setupNamespaceLock(); 142 } 143 144 /** 145 * Constructor for table lock. 146 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 147 */ 148 public LockProcedure(final Configuration conf, final TableName tableName, final LockType type, 149 final String description, final CountDownLatch lockAcquireLatch) 150 throws IllegalArgumentException { 151 this(conf, type, description, lockAcquireLatch); 152 153 this.tableName = tableName; 154 this.lock = setupTableLock(); 155 } 156 157 /** 158 * Constructor for region lock(s). 159 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. Useful for 160 * locks acquired locally from master process. 161 * @throws IllegalArgumentException if all regions are not from same table. 162 */ 163 public LockProcedure(final Configuration conf, final RegionInfo[] regionInfos, 164 final LockType type, final String description, final CountDownLatch lockAcquireLatch) 165 throws IllegalArgumentException { 166 this(conf, type, description, lockAcquireLatch); 167 168 // Build RegionInfo from region names. 169 if (regionInfos.length == 0) { 170 throw new IllegalArgumentException("No regions specified for region lock"); 171 } 172 173 // check all regions belong to same table. 174 final TableName regionTable = regionInfos[0].getTable(); 175 for (int i = 1; i < regionInfos.length; ++i) { 176 if (!regionInfos[i].getTable().equals(regionTable)) { 177 throw new IllegalArgumentException("All regions should be from same table"); 178 } 179 } 180 181 this.regionInfos = regionInfos; 182 this.lock = setupRegionLock(); 183 } 184 185 private boolean hasHeartbeatExpired() { 186 return EnvironmentEdgeManager.currentTime() - lastHeartBeat.get() >= getTimeout(); 187 } 188 189 /** 190 * Updates timeout deadline for the lock. 191 */ 192 public void updateHeartBeat() { 193 lastHeartBeat.set(EnvironmentEdgeManager.currentTime()); 194 if (LOG.isDebugEnabled()) { 195 LOG.debug("Heartbeat " + toString()); 196 } 197 } 198 199 /** 200 * Re run the procedure after every timeout to write new WAL entries so we don't hold back old 201 * WALs. 202 * @return false, so procedure framework doesn't mark this procedure as failure. 203 */ 204 @Override 205 protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) { 206 synchronized (event) { 207 if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event); 208 if (!event.isReady()) { // Maybe unlock() awakened the event. 209 setState(ProcedureProtos.ProcedureState.RUNNABLE); 210 if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event); 211 event.wake(env.getProcedureScheduler()); 212 } 213 } 214 return false; // false: do not mark the procedure as failed. 215 } 216 217 // Can be called before procedure gets scheduled, in which case, the execute() will finish 218 // immediately and release the underlying locks. 219 public void unlock(final MasterProcedureEnv env) { 220 unlock.set(true); 221 locked.set(false); 222 // Maybe timeout already awakened the event and the procedure has finished. 223 synchronized (event) { 224 if (!event.isReady() && suspended) { 225 setState(ProcedureProtos.ProcedureState.RUNNABLE); 226 event.wake(env.getProcedureScheduler()); 227 suspended = false; 228 } 229 } 230 } 231 232 @Override 233 protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env) 234 throws ProcedureSuspendedException { 235 // Local master locks don't store any state, so on recovery, simply finish this procedure 236 // immediately. 237 if (recoveredMasterLock) return null; 238 if (lockAcquireLatch != null) { 239 lockAcquireLatch.countDown(); 240 } 241 if (unlock.get() || hasHeartbeatExpired()) { 242 locked.set(false); 243 LOG.debug((unlock.get() ? "UNLOCKED " : "TIMED OUT ") + toString()); 244 return null; 245 } 246 synchronized (event) { 247 event.suspend(); 248 event.suspendIfNotReady(this); 249 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 250 suspended = true; 251 } 252 throw new ProcedureSuspendedException(); 253 } 254 255 @Override 256 protected void rollback(final MasterProcedureEnv env) { 257 throw new UnsupportedOperationException(); 258 } 259 260 @Override 261 protected boolean abort(final MasterProcedureEnv env) { 262 unlock(env); 263 return true; 264 } 265 266 @Override 267 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 268 final LockProcedureData.Builder builder = LockProcedureData.newBuilder() 269 .setLockType(LockServiceProtos.LockType.valueOf(type.name())).setDescription(description); 270 if (regionInfos != null) { 271 for (int i = 0; i < regionInfos.length; ++i) { 272 builder.addRegionInfo(ProtobufUtil.toRegionInfo(regionInfos[i])); 273 } 274 } else if (namespace != null) { 275 builder.setNamespace(namespace); 276 } else if (tableName != null) { 277 builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); 278 } 279 if (lockAcquireLatch != null) { 280 builder.setIsMasterLock(true); 281 } 282 serializer.serialize(builder.build()); 283 } 284 285 @Override 286 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 287 final LockProcedureData state = serializer.deserialize(LockProcedureData.class); 288 type = LockType.valueOf(state.getLockType().name()); 289 description = state.getDescription(); 290 if (state.getRegionInfoCount() > 0) { 291 regionInfos = new RegionInfo[state.getRegionInfoCount()]; 292 for (int i = 0; i < state.getRegionInfoCount(); ++i) { 293 regionInfos[i] = ProtobufUtil.toRegionInfo(state.getRegionInfo(i)); 294 } 295 } else if (state.hasNamespace()) { 296 namespace = state.getNamespace(); 297 } else if (state.hasTableName()) { 298 tableName = ProtobufUtil.toTableName(state.getTableName()); 299 } 300 recoveredMasterLock = state.getIsMasterLock(); 301 this.lock = setupLock(); 302 } 303 304 @Override 305 protected LockState acquireLock(final MasterProcedureEnv env) { 306 boolean ret = lock.acquireLock(env); 307 locked.set(ret); 308 if (ret) { 309 if (LOG.isDebugEnabled()) { 310 LOG.debug("LOCKED " + toString()); 311 } 312 lastHeartBeat.set(EnvironmentEdgeManager.currentTime()); 313 return LockState.LOCK_ACQUIRED; 314 } 315 LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING"); 316 return LockState.LOCK_EVENT_WAIT; 317 } 318 319 @Override 320 protected void releaseLock(final MasterProcedureEnv env) { 321 lock.releaseLock(env); 322 } 323 324 /** 325 * On recovery, re-execute from start to acquire the locks. Need to explicitly set it to RUNNABLE 326 * because the procedure might have been in WAITING_TIMEOUT state when crash happened. In which 327 * case, it'll be sent back to timeout queue on recovery, which we don't want since we want to 328 * require locks. 329 */ 330 @Override 331 protected void beforeReplay(MasterProcedureEnv env) { 332 setState(ProcedureProtos.ProcedureState.RUNNABLE); 333 } 334 335 @Override 336 protected void toStringClassDetails(final StringBuilder builder) { 337 super.toStringClassDetails(builder); 338 if (regionInfos != null) { 339 builder.append(" regions="); 340 for (int i = 0; i < regionInfos.length; ++i) { 341 if (i > 0) builder.append(","); 342 builder.append(regionInfos[i].getShortNameToLog()); 343 } 344 } else if (namespace != null) { 345 builder.append(", namespace=").append(namespace); 346 } else if (tableName != null) { 347 builder.append(", tableName=").append(tableName); 348 } 349 builder.append(", type=").append(type); 350 } 351 352 public LockType getType() { 353 return type; 354 } 355 356 private LockInterface setupLock() throws IllegalArgumentException { 357 if (regionInfos != null) { 358 return setupRegionLock(); 359 } else if (namespace != null) { 360 return setupNamespaceLock(); 361 } else if (tableName != null) { 362 return setupTableLock(); 363 } else { 364 LOG.error("Unknown level specified in " + toString()); 365 throw new IllegalArgumentException("no namespace/table/region provided"); 366 } 367 } 368 369 private LockInterface setupNamespaceLock() throws IllegalArgumentException { 370 this.tableName = TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME; 371 switch (type) { 372 case EXCLUSIVE: 373 this.opType = TableOperationType.EDIT; 374 return new NamespaceExclusiveLock(); 375 case SHARED: 376 LOG.error("Shared lock on namespace not supported for " + toString()); 377 throw new IllegalArgumentException("Shared lock on namespace not supported"); 378 default: 379 LOG.error("Unexpected lock type " + toString()); 380 throw new IllegalArgumentException("Wrong lock type: " + type.toString()); 381 } 382 } 383 384 private LockInterface setupTableLock() throws IllegalArgumentException { 385 switch (type) { 386 case EXCLUSIVE: 387 this.opType = TableOperationType.EDIT; 388 return new TableExclusiveLock(); 389 case SHARED: 390 this.opType = TableOperationType.READ; 391 return new TableSharedLock(); 392 default: 393 LOG.error("Unexpected lock type " + toString()); 394 throw new IllegalArgumentException("Wrong lock type:" + type.toString()); 395 } 396 } 397 398 private LockInterface setupRegionLock() throws IllegalArgumentException { 399 this.tableName = regionInfos[0].getTable(); 400 switch (type) { 401 case EXCLUSIVE: 402 this.opType = TableOperationType.REGION_EDIT; 403 return new RegionExclusiveLock(); 404 default: 405 LOG.error("Only exclusive lock supported on regions for " + toString()); 406 throw new IllegalArgumentException("Only exclusive lock supported on regions."); 407 } 408 } 409 410 public String getDescription() { 411 return description; 412 } 413 414 public boolean isLocked() { 415 return locked.get(); 416 } 417 418 @Override 419 public boolean holdLock(final MasterProcedureEnv env) { 420 return true; 421 } 422 423 /////////////////////// 424 // LOCK IMPLEMENTATIONS 425 /////////////////////// 426 427 private class TableExclusiveLock implements LockInterface { 428 @Override 429 public boolean acquireLock(final MasterProcedureEnv env) { 430 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 431 // to get the lock and false if you don't; i.e. you got the lock. 432 return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName); 433 } 434 435 @Override 436 public void releaseLock(final MasterProcedureEnv env) { 437 env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName); 438 } 439 } 440 441 private class TableSharedLock implements LockInterface { 442 @Override 443 public boolean acquireLock(final MasterProcedureEnv env) { 444 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 445 // to get the lock and false if you don't; i.e. you got the lock. 446 return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName); 447 } 448 449 @Override 450 public void releaseLock(final MasterProcedureEnv env) { 451 env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName); 452 } 453 } 454 455 private class NamespaceExclusiveLock implements LockInterface { 456 @Override 457 public boolean acquireLock(final MasterProcedureEnv env) { 458 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 459 // to get the lock and false if you don't; i.e. you got the lock. 460 return !env.getProcedureScheduler().waitNamespaceExclusiveLock(LockProcedure.this, namespace); 461 } 462 463 @Override 464 public void releaseLock(final MasterProcedureEnv env) { 465 env.getProcedureScheduler().wakeNamespaceExclusiveLock(LockProcedure.this, namespace); 466 } 467 } 468 469 private class RegionExclusiveLock implements LockInterface { 470 @Override 471 public boolean acquireLock(final MasterProcedureEnv env) { 472 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 473 // to get the lock and false if you don't; i.e. you got the lock. 474 return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos); 475 } 476 477 @Override 478 public void releaseLock(final MasterProcedureEnv env) { 479 env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos); 480 } 481 } 482}