001/** 002 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master.locking; 021 022import java.io.IOException; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; 032import org.apache.hadoop.hbase.procedure2.LockType; 033import org.apache.hadoop.hbase.procedure2.Procedure; 034import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 035import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 044 045/** 046 * Procedure to allow blessed clients and external admin tools to take our internal Schema locks 047 * used by the procedure framework isolating procedures doing creates/deletes etc. on 048 * table/namespace/regions. 049 * This procedure when scheduled, acquires specified locks, suspends itself and waits for: 050 * <ul> 051 * <li>Call to unlock: if lock request came from the process itself, say master chore.</li> 052 * <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding 053 * the lock or not based on last heartbeat timestamp.</li> 054 * </ul> 055 */ 056@InterfaceAudience.Private 057public final class LockProcedure extends Procedure<MasterProcedureEnv> 058 implements TableProcedureInterface { 059 private static final Logger LOG = LoggerFactory.getLogger(LockProcedure.class); 060 061 public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000; // timeout in ms 062 public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF = 063 "hbase.master.procedure.remote.locks.timeout.ms"; 064 // 10 min. Same as old ZK lock timeout. 065 public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000; 066 public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF = 067 "hbase.master.procedure.local.master.locks.timeout.ms"; 068 069 private String namespace; 070 private TableName tableName; 071 private RegionInfo[] regionInfos; 072 private LockType type; 073 // underlying namespace/table/region lock. 074 private LockInterface lock; 075 private TableOperationType opType; 076 private String description; 077 // True when recovery of master lock from WALs 078 private boolean recoveredMasterLock; 079 080 private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this); 081 // True if this proc acquired relevant locks. This value is for client checks. 082 private final AtomicBoolean locked = new AtomicBoolean(false); 083 // Last system time (in ms) when client sent the heartbeat. 084 // Initialize to system time for non-null value in case of recovery. 085 private final AtomicLong lastHeartBeat = new AtomicLong(); 086 // Set to true when unlock request is received. 087 private final AtomicBoolean unlock = new AtomicBoolean(false); 088 // decreased when locks are acquired. Only used for local (with master process) purposes. 089 // Setting latch to non-null value increases default timeout to 090 // DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat. 091 private final CountDownLatch lockAcquireLatch; 092 093 @Override 094 public TableName getTableName() { 095 return tableName; 096 } 097 098 @Override 099 public TableOperationType getTableOperationType() { 100 return opType; 101 } 102 103 private interface LockInterface { 104 boolean acquireLock(MasterProcedureEnv env); 105 void releaseLock(MasterProcedureEnv env); 106 } 107 108 public LockProcedure() { 109 lockAcquireLatch = null; 110 } 111 112 private LockProcedure(final Configuration conf, final LockType type, 113 final String description, final CountDownLatch lockAcquireLatch) { 114 this.type = type; 115 this.description = description; 116 this.lockAcquireLatch = lockAcquireLatch; 117 if (lockAcquireLatch == null) { 118 setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS)); 119 } else { 120 setTimeout(conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, 121 DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS)); 122 } 123 } 124 125 /** 126 * Constructor for namespace lock. 127 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 128 */ 129 public LockProcedure(final Configuration conf, final String namespace, final LockType type, 130 final String description, final CountDownLatch lockAcquireLatch) 131 throws IllegalArgumentException { 132 this(conf, type, description, lockAcquireLatch); 133 134 if (namespace.isEmpty()) { 135 throw new IllegalArgumentException("Empty namespace"); 136 } 137 138 this.namespace = namespace; 139 this.lock = setupNamespaceLock(); 140 } 141 142 /** 143 * Constructor for table lock. 144 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 145 */ 146 public LockProcedure(final Configuration conf, final TableName tableName, final LockType type, 147 final String description, final CountDownLatch lockAcquireLatch) 148 throws IllegalArgumentException { 149 this(conf, type, description, lockAcquireLatch); 150 151 this.tableName = tableName; 152 this.lock = setupTableLock(); 153 } 154 155 /** 156 * Constructor for region lock(s). 157 * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. 158 * Useful for locks acquired locally from master process. 159 * @throws IllegalArgumentException if all regions are not from same table. 160 */ 161 public LockProcedure(final Configuration conf, final RegionInfo[] regionInfos, 162 final LockType type, final String description, final CountDownLatch lockAcquireLatch) 163 throws IllegalArgumentException { 164 this(conf, type, description, lockAcquireLatch); 165 166 // Build RegionInfo from region names. 167 if (regionInfos.length == 0) { 168 throw new IllegalArgumentException("No regions specified for region lock"); 169 } 170 171 // check all regions belong to same table. 172 final TableName regionTable = regionInfos[0].getTable(); 173 for (int i = 1; i < regionInfos.length; ++i) { 174 if (!regionInfos[i].getTable().equals(regionTable)) { 175 throw new IllegalArgumentException("All regions should be from same table"); 176 } 177 } 178 179 this.regionInfos = regionInfos; 180 this.lock = setupRegionLock(); 181 } 182 183 private boolean hasHeartbeatExpired() { 184 return System.currentTimeMillis() - lastHeartBeat.get() >= getTimeout(); 185 } 186 187 /** 188 * Updates timeout deadline for the lock. 189 */ 190 public void updateHeartBeat() { 191 lastHeartBeat.set(System.currentTimeMillis()); 192 if (LOG.isDebugEnabled()) { 193 LOG.debug("Heartbeat " + toString()); 194 } 195 } 196 197 /** 198 * Re run the procedure after every timeout to write new WAL entries so we don't hold back old 199 * WALs. 200 * @return false, so procedure framework doesn't mark this procedure as failure. 201 */ 202 @Override 203 protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) { 204 synchronized (event) { 205 if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event); 206 if (!event.isReady()) { // Maybe unlock() awakened the event. 207 setState(ProcedureProtos.ProcedureState.RUNNABLE); 208 if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event); 209 event.wake(env.getProcedureScheduler()); 210 } 211 } 212 return false; // false: do not mark the procedure as failed. 213 } 214 215 // Can be called before procedure gets scheduled, in which case, the execute() will finish 216 // immediately and release the underlying locks. 217 public void unlock(final MasterProcedureEnv env) { 218 unlock.set(true); 219 locked.set(false); 220 // Maybe timeout already awakened the event and the procedure has finished. 221 synchronized (event) { 222 if (!event.isReady()) { 223 setState(ProcedureProtos.ProcedureState.RUNNABLE); 224 event.wake(env.getProcedureScheduler()); 225 } 226 } 227 } 228 229 @Override 230 protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env) 231 throws ProcedureSuspendedException { 232 // Local master locks don't store any state, so on recovery, simply finish this procedure 233 // immediately. 234 if (recoveredMasterLock) return null; 235 if (lockAcquireLatch != null) { 236 lockAcquireLatch.countDown(); 237 } 238 if (unlock.get() || hasHeartbeatExpired()) { 239 locked.set(false); 240 LOG.debug((unlock.get()? "UNLOCKED " : "TIMED OUT ") + toString()); 241 return null; 242 } 243 synchronized (event) { 244 event.suspend(); 245 event.suspendIfNotReady(this); 246 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 247 } 248 throw new ProcedureSuspendedException(); 249 } 250 251 @Override 252 protected void rollback(final MasterProcedureEnv env) { 253 throw new UnsupportedOperationException(); 254 } 255 256 @Override 257 protected boolean abort(final MasterProcedureEnv env) { 258 unlock(env); 259 return true; 260 } 261 262 @Override 263 protected void serializeStateData(ProcedureStateSerializer serializer) 264 throws IOException { 265 final LockProcedureData.Builder builder = LockProcedureData.newBuilder() 266 .setLockType(LockServiceProtos.LockType.valueOf(type.name())) 267 .setDescription(description); 268 if (regionInfos != null) { 269 for (int i = 0; i < regionInfos.length; ++i) { 270 builder.addRegionInfo(ProtobufUtil.toRegionInfo(regionInfos[i])); 271 } 272 } else if (namespace != null) { 273 builder.setNamespace(namespace); 274 } else if (tableName != null) { 275 builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); 276 } 277 if (lockAcquireLatch != null) { 278 builder.setIsMasterLock(true); 279 } 280 serializer.serialize(builder.build()); 281 } 282 283 @Override 284 protected void deserializeStateData(ProcedureStateSerializer serializer) 285 throws IOException { 286 final LockProcedureData state = serializer.deserialize(LockProcedureData.class); 287 type = LockType.valueOf(state.getLockType().name()); 288 description = state.getDescription(); 289 if (state.getRegionInfoCount() > 0) { 290 regionInfos = new RegionInfo[state.getRegionInfoCount()]; 291 for (int i = 0; i < state.getRegionInfoCount(); ++i) { 292 regionInfos[i] = ProtobufUtil.toRegionInfo(state.getRegionInfo(i)); 293 } 294 } else if (state.hasNamespace()) { 295 namespace = state.getNamespace(); 296 } else if (state.hasTableName()) { 297 tableName = ProtobufUtil.toTableName(state.getTableName()); 298 } 299 recoveredMasterLock = state.getIsMasterLock(); 300 this.lock = setupLock(); 301 } 302 303 @Override 304 protected LockState acquireLock(final MasterProcedureEnv env) { 305 boolean ret = lock.acquireLock(env); 306 locked.set(ret); 307 if (ret) { 308 if (LOG.isDebugEnabled()) { 309 LOG.debug("LOCKED " + toString()); 310 } 311 lastHeartBeat.set(System.currentTimeMillis()); 312 return LockState.LOCK_ACQUIRED; 313 } 314 LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING"); 315 return LockState.LOCK_EVENT_WAIT; 316 } 317 318 @Override 319 protected void releaseLock(final MasterProcedureEnv env) { 320 lock.releaseLock(env); 321 } 322 323 /** 324 * On recovery, re-execute from start to acquire the locks. 325 * Need to explicitly set it to RUNNABLE because the procedure might have been in WAITING_TIMEOUT 326 * state when crash happened. In which case, it'll be sent back to timeout queue on recovery, 327 * which we don't want since we want to require locks. 328 */ 329 @Override 330 protected void beforeReplay(MasterProcedureEnv env) { 331 setState(ProcedureProtos.ProcedureState.RUNNABLE); 332 } 333 334 @Override 335 protected void toStringClassDetails(final StringBuilder builder) { 336 super.toStringClassDetails(builder); 337 if (regionInfos != null) { 338 builder.append(" regions="); 339 for (int i = 0; i < regionInfos.length; ++i) { 340 if (i > 0) builder.append(","); 341 builder.append(regionInfos[i].getShortNameToLog()); 342 } 343 } else if (namespace != null) { 344 builder.append(", namespace=").append(namespace); 345 } else if (tableName != null) { 346 builder.append(", tableName=").append(tableName); 347 } 348 builder.append(", type=").append(type); 349 } 350 351 public LockType getType() { 352 return type; 353 } 354 355 private LockInterface setupLock() throws IllegalArgumentException { 356 if (regionInfos != null) { 357 return setupRegionLock(); 358 } else if (namespace != null) { 359 return setupNamespaceLock(); 360 } else if (tableName != null) { 361 return setupTableLock(); 362 } else { 363 LOG.error("Unknown level specified in " + toString()); 364 throw new IllegalArgumentException("no namespace/table/region provided"); 365 } 366 } 367 368 private LockInterface setupNamespaceLock() throws IllegalArgumentException { 369 this.tableName = TableName.NAMESPACE_TABLE_NAME; 370 switch (type) { 371 case EXCLUSIVE: 372 this.opType = TableOperationType.EDIT; 373 return new NamespaceExclusiveLock(); 374 case SHARED: 375 LOG.error("Shared lock on namespace not supported for " + toString()); 376 throw new IllegalArgumentException("Shared lock on namespace not supported"); 377 default: 378 LOG.error("Unexpected lock type " + toString()); 379 throw new IllegalArgumentException("Wrong lock type: " + type.toString()); 380 } 381 } 382 383 private LockInterface setupTableLock() throws IllegalArgumentException { 384 switch (type) { 385 case EXCLUSIVE: 386 this.opType = TableOperationType.EDIT; 387 return new TableExclusiveLock(); 388 case SHARED: 389 this.opType = TableOperationType.READ; 390 return new TableSharedLock(); 391 default: 392 LOG.error("Unexpected lock type " + toString()); 393 throw new IllegalArgumentException("Wrong lock type:" + type.toString()); 394 } 395 } 396 397 private LockInterface setupRegionLock() throws IllegalArgumentException { 398 this.tableName = regionInfos[0].getTable(); 399 switch (type) { 400 case EXCLUSIVE: 401 this.opType = TableOperationType.REGION_EDIT; 402 return new RegionExclusiveLock(); 403 default: 404 LOG.error("Only exclusive lock supported on regions for " + toString()); 405 throw new IllegalArgumentException("Only exclusive lock supported on regions."); 406 } 407 } 408 409 public String getDescription() { 410 return description; 411 } 412 413 public boolean isLocked() { 414 return locked.get(); 415 } 416 417 @Override 418 public boolean holdLock(final MasterProcedureEnv env) { 419 return true; 420 } 421 422 /////////////////////// 423 // LOCK IMPLEMENTATIONS 424 /////////////////////// 425 426 private class TableExclusiveLock implements LockInterface { 427 @Override 428 public boolean acquireLock(final MasterProcedureEnv env) { 429 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 430 // to get the lock and false if you don't; i.e. you got the lock. 431 return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName); 432 } 433 434 @Override 435 public void releaseLock(final MasterProcedureEnv env) { 436 env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName); 437 } 438 } 439 440 private class TableSharedLock implements LockInterface { 441 @Override 442 public boolean acquireLock(final MasterProcedureEnv env) { 443 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 444 // to get the lock and false if you don't; i.e. you got the lock. 445 return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName); 446 } 447 448 @Override 449 public void releaseLock(final MasterProcedureEnv env) { 450 env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName); 451 } 452 } 453 454 private class NamespaceExclusiveLock implements LockInterface { 455 @Override 456 public boolean acquireLock(final MasterProcedureEnv env) { 457 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 458 // to get the lock and false if you don't; i.e. you got the lock. 459 return !env.getProcedureScheduler().waitNamespaceExclusiveLock( 460 LockProcedure.this, namespace); 461 } 462 463 @Override 464 public void releaseLock(final MasterProcedureEnv env) { 465 env.getProcedureScheduler().wakeNamespaceExclusiveLock( 466 LockProcedure.this, namespace); 467 } 468 } 469 470 private class RegionExclusiveLock implements LockInterface { 471 @Override 472 public boolean acquireLock(final MasterProcedureEnv env) { 473 // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT 474 // to get the lock and false if you don't; i.e. you got the lock. 475 return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos); 476 } 477 478 @Override 479 public void releaseLock(final MasterProcedureEnv env) { 480 env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos); 481 } 482 } 483}