001/**
002
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master.locking;
021
022import java.io.IOException;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
032import org.apache.hadoop.hbase.procedure2.LockType;
033import org.apache.hadoop.hbase.procedure2.Procedure;
034import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
035import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
044
045/**
046 * Procedure to allow blessed clients and external admin tools to take our internal Schema locks
047 * used by the procedure framework isolating procedures doing creates/deletes etc. on
048 * table/namespace/regions.
049 * This procedure when scheduled, acquires specified locks, suspends itself and waits for:
050 * <ul>
051 * <li>Call to unlock: if lock request came from the process itself, say master chore.</li>
052 * <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding
053 * the lock or not based on last heartbeat timestamp.</li>
054 * </ul>
055 */
056@InterfaceAudience.Private
057public final class LockProcedure extends Procedure<MasterProcedureEnv>
058    implements TableProcedureInterface {
059  private static final Logger LOG = LoggerFactory.getLogger(LockProcedure.class);
060
061  public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000;  // timeout in ms
062  public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF =
063      "hbase.master.procedure.remote.locks.timeout.ms";
064  // 10 min. Same as old ZK lock timeout.
065  public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000;
066  public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF =
067      "hbase.master.procedure.local.master.locks.timeout.ms";
068
069  private String namespace;
070  private TableName tableName;
071  private RegionInfo[] regionInfos;
072  private LockType type;
073  // underlying namespace/table/region lock.
074  private LockInterface lock;
075  private TableOperationType opType;
076  private String description;
077  // True when recovery of master lock from WALs
078  private boolean recoveredMasterLock;
079
080  private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this);
081  // True if this proc acquired relevant locks. This value is for client checks.
082  private final AtomicBoolean locked = new AtomicBoolean(false);
083  // Last system time (in ms) when client sent the heartbeat.
084  // Initialize to system time for non-null value in case of recovery.
085  private final AtomicLong lastHeartBeat = new AtomicLong();
086  // Set to true when unlock request is received.
087  private final AtomicBoolean unlock = new AtomicBoolean(false);
088  // decreased when locks are acquired. Only used for local (with master process) purposes.
089  // Setting latch to non-null value increases default timeout to
090  // DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat.
091  private final CountDownLatch lockAcquireLatch;
092
093  private volatile boolean suspended = false;
094
095  @Override
096  public TableName getTableName() {
097    return tableName;
098  }
099
100  @Override
101  public TableOperationType getTableOperationType() {
102    return opType;
103  }
104
105  private interface LockInterface {
106    boolean acquireLock(MasterProcedureEnv env);
107    void releaseLock(MasterProcedureEnv env);
108  }
109
110  public LockProcedure() {
111    lockAcquireLatch = null;
112  }
113
114  private LockProcedure(final Configuration conf, final LockType type,
115      final String description, final CountDownLatch lockAcquireLatch) {
116    this.type = type;
117    this.description = description;
118    this.lockAcquireLatch = lockAcquireLatch;
119    if (lockAcquireLatch == null) {
120      setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS));
121    } else {
122      setTimeout(conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF,
123          DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS));
124    }
125  }
126
127  /**
128   * Constructor for namespace lock.
129   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
130   */
131  public LockProcedure(final Configuration conf, final String namespace, final LockType type,
132      final String description, final CountDownLatch lockAcquireLatch)
133      throws IllegalArgumentException {
134    this(conf, type, description, lockAcquireLatch);
135
136    if (namespace.isEmpty()) {
137      throw new IllegalArgumentException("Empty namespace");
138    }
139
140    this.namespace = namespace;
141    this.lock = setupNamespaceLock();
142  }
143
144  /**
145   * Constructor for table lock.
146   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
147   */
148  public LockProcedure(final Configuration conf, final TableName tableName, final LockType type,
149      final String description, final CountDownLatch lockAcquireLatch)
150      throws IllegalArgumentException {
151    this(conf, type, description, lockAcquireLatch);
152
153    this.tableName = tableName;
154    this.lock = setupTableLock();
155  }
156
157  /**
158   * Constructor for region lock(s).
159   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
160   *                        Useful for locks acquired locally from master process.
161   * @throws IllegalArgumentException if all regions are not from same table.
162   */
163  public LockProcedure(final Configuration conf, final RegionInfo[] regionInfos,
164      final LockType type, final String description, final CountDownLatch lockAcquireLatch)
165      throws IllegalArgumentException {
166    this(conf, type, description, lockAcquireLatch);
167
168    // Build RegionInfo from region names.
169    if (regionInfos.length == 0) {
170      throw new IllegalArgumentException("No regions specified for region lock");
171    }
172
173    // check all regions belong to same table.
174    final TableName regionTable = regionInfos[0].getTable();
175    for (int i = 1; i < regionInfos.length; ++i) {
176      if (!regionInfos[i].getTable().equals(regionTable)) {
177        throw new IllegalArgumentException("All regions should be from same table");
178      }
179    }
180
181    this.regionInfos = regionInfos;
182    this.lock = setupRegionLock();
183  }
184
185  private boolean hasHeartbeatExpired() {
186    return System.currentTimeMillis() - lastHeartBeat.get() >= getTimeout();
187  }
188
189  /**
190   * Updates timeout deadline for the lock.
191   */
192  public void updateHeartBeat() {
193    lastHeartBeat.set(System.currentTimeMillis());
194    if (LOG.isDebugEnabled()) {
195      LOG.debug("Heartbeat " + toString());
196    }
197  }
198
199  /**
200   * Re run the procedure after every timeout to write new WAL entries so we don't hold back old
201   * WALs.
202   * @return false, so procedure framework doesn't mark this procedure as failure.
203   */
204  @Override
205  protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) {
206    synchronized (event) {
207      if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event);
208      if (!event.isReady()) {  // Maybe unlock() awakened the event.
209        setState(ProcedureProtos.ProcedureState.RUNNABLE);
210        if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
211        event.wake(env.getProcedureScheduler());
212      }
213    }
214    return false;  // false: do not mark the procedure as failed.
215  }
216
217  // Can be called before procedure gets scheduled, in which case, the execute() will finish
218  // immediately and release the underlying locks.
219  public void unlock(final MasterProcedureEnv env) {
220    unlock.set(true);
221    locked.set(false);
222    // Maybe timeout already awakened the event and the procedure has finished.
223    synchronized (event) {
224      if (!event.isReady() && suspended) {
225        setState(ProcedureProtos.ProcedureState.RUNNABLE);
226        event.wake(env.getProcedureScheduler());
227        suspended = false;
228      }
229    }
230  }
231
232  @Override
233  protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env)
234  throws ProcedureSuspendedException {
235    // Local master locks don't store any state, so on recovery, simply finish this procedure
236    // immediately.
237    if (recoveredMasterLock) return null;
238    if (lockAcquireLatch != null) {
239      lockAcquireLatch.countDown();
240    }
241    if (unlock.get() || hasHeartbeatExpired()) {
242      locked.set(false);
243      LOG.debug((unlock.get()? "UNLOCKED " : "TIMED OUT ") + toString());
244      return null;
245    }
246    synchronized (event) {
247      event.suspend();
248      event.suspendIfNotReady(this);
249      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
250      suspended = true;
251    }
252    throw new ProcedureSuspendedException();
253  }
254
255  @Override
256  protected void rollback(final MasterProcedureEnv env) {
257    throw new UnsupportedOperationException();
258  }
259
260  @Override
261  protected boolean abort(final MasterProcedureEnv env) {
262    unlock(env);
263    return true;
264  }
265
266  @Override
267  protected void serializeStateData(ProcedureStateSerializer serializer)
268      throws IOException {
269    final LockProcedureData.Builder builder = LockProcedureData.newBuilder()
270          .setLockType(LockServiceProtos.LockType.valueOf(type.name()))
271          .setDescription(description);
272    if (regionInfos != null) {
273      for (int i = 0; i < regionInfos.length; ++i) {
274        builder.addRegionInfo(ProtobufUtil.toRegionInfo(regionInfos[i]));
275      }
276    } else if (namespace != null) {
277      builder.setNamespace(namespace);
278    } else if (tableName != null) {
279      builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
280    }
281    if (lockAcquireLatch != null) {
282      builder.setIsMasterLock(true);
283    }
284    serializer.serialize(builder.build());
285  }
286
287  @Override
288  protected void deserializeStateData(ProcedureStateSerializer serializer)
289      throws IOException {
290    final LockProcedureData state = serializer.deserialize(LockProcedureData.class);
291    type = LockType.valueOf(state.getLockType().name());
292    description = state.getDescription();
293    if (state.getRegionInfoCount() > 0) {
294      regionInfos = new RegionInfo[state.getRegionInfoCount()];
295      for (int i = 0; i < state.getRegionInfoCount(); ++i) {
296        regionInfos[i] = ProtobufUtil.toRegionInfo(state.getRegionInfo(i));
297      }
298    } else if (state.hasNamespace()) {
299      namespace = state.getNamespace();
300    } else if (state.hasTableName()) {
301      tableName = ProtobufUtil.toTableName(state.getTableName());
302    }
303    recoveredMasterLock = state.getIsMasterLock();
304    this.lock = setupLock();
305  }
306
307  @Override
308  protected LockState acquireLock(final MasterProcedureEnv env) {
309    boolean ret = lock.acquireLock(env);
310    locked.set(ret);
311    if (ret) {
312      if (LOG.isDebugEnabled()) {
313        LOG.debug("LOCKED " + toString());
314      }
315      lastHeartBeat.set(System.currentTimeMillis());
316      return LockState.LOCK_ACQUIRED;
317    }
318    LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING");
319    return LockState.LOCK_EVENT_WAIT;
320  }
321
322  @Override
323  protected void releaseLock(final MasterProcedureEnv env) {
324    lock.releaseLock(env);
325  }
326
327  /**
328   * On recovery, re-execute from start to acquire the locks.
329   * Need to explicitly set it to RUNNABLE because the procedure might have been in WAITING_TIMEOUT
330   * state when crash happened. In which case, it'll be sent back to timeout queue on recovery,
331   * which we don't want since we want to require locks.
332   */
333  @Override
334  protected void beforeReplay(MasterProcedureEnv env) {
335    setState(ProcedureProtos.ProcedureState.RUNNABLE);
336  }
337
338  @Override
339  protected void toStringClassDetails(final StringBuilder builder) {
340    super.toStringClassDetails(builder);
341    if (regionInfos != null) {
342      builder.append(" regions=");
343      for (int i = 0; i < regionInfos.length; ++i) {
344        if (i > 0) builder.append(",");
345        builder.append(regionInfos[i].getShortNameToLog());
346      }
347    } else if (namespace != null) {
348      builder.append(", namespace=").append(namespace);
349    } else if (tableName != null) {
350      builder.append(", tableName=").append(tableName);
351    }
352    builder.append(", type=").append(type);
353  }
354
355  public LockType getType() {
356    return type;
357  }
358
359  private LockInterface setupLock() throws IllegalArgumentException {
360    if (regionInfos != null) {
361      return setupRegionLock();
362    } else if (namespace != null) {
363      return setupNamespaceLock();
364    } else if (tableName != null) {
365      return setupTableLock();
366    } else {
367      LOG.error("Unknown level specified in " + toString());
368      throw new IllegalArgumentException("no namespace/table/region provided");
369    }
370  }
371
372  private LockInterface setupNamespaceLock() throws IllegalArgumentException {
373    this.tableName = TableName.NAMESPACE_TABLE_NAME;
374    switch (type) {
375      case EXCLUSIVE:
376        this.opType = TableOperationType.EDIT;
377        return new NamespaceExclusiveLock();
378      case SHARED:
379        LOG.error("Shared lock on namespace not supported for " + toString());
380        throw new IllegalArgumentException("Shared lock on namespace not supported");
381      default:
382        LOG.error("Unexpected lock type " + toString());
383        throw new IllegalArgumentException("Wrong lock type: " + type.toString());
384    }
385  }
386
387  private LockInterface setupTableLock() throws IllegalArgumentException {
388    switch (type) {
389      case EXCLUSIVE:
390        this.opType = TableOperationType.EDIT;
391        return new TableExclusiveLock();
392      case SHARED:
393        this.opType = TableOperationType.READ;
394        return new TableSharedLock();
395      default:
396        LOG.error("Unexpected lock type " + toString());
397        throw new IllegalArgumentException("Wrong lock type:" + type.toString());
398    }
399  }
400
401  private LockInterface setupRegionLock() throws IllegalArgumentException {
402    this.tableName = regionInfos[0].getTable();
403    switch (type) {
404      case EXCLUSIVE:
405        this.opType = TableOperationType.REGION_EDIT;
406        return new RegionExclusiveLock();
407      default:
408        LOG.error("Only exclusive lock supported on regions for " + toString());
409        throw new IllegalArgumentException("Only exclusive lock supported on regions.");
410    }
411  }
412
413  public String getDescription() {
414    return description;
415  }
416
417  public boolean isLocked() {
418    return locked.get();
419  }
420
421  @Override
422  public boolean holdLock(final MasterProcedureEnv env) {
423    return true;
424  }
425
426  ///////////////////////
427  // LOCK IMPLEMENTATIONS
428  ///////////////////////
429
430  private class TableExclusiveLock implements LockInterface {
431    @Override
432    public boolean acquireLock(final MasterProcedureEnv env) {
433      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
434      // to get the lock and false if you don't; i.e. you got the lock.
435      return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName);
436    }
437
438    @Override
439    public void releaseLock(final MasterProcedureEnv env) {
440      env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName);
441    }
442  }
443
444  private class TableSharedLock implements LockInterface {
445    @Override
446    public boolean acquireLock(final MasterProcedureEnv env) {
447      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
448      // to get the lock and false if you don't; i.e. you got the lock.
449      return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName);
450    }
451
452    @Override
453    public void releaseLock(final MasterProcedureEnv env) {
454      env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName);
455    }
456  }
457
458  private class NamespaceExclusiveLock implements LockInterface {
459    @Override
460    public boolean acquireLock(final MasterProcedureEnv env) {
461      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
462      // to get the lock and false if you don't; i.e. you got the lock.
463      return !env.getProcedureScheduler().waitNamespaceExclusiveLock(
464          LockProcedure.this, namespace);
465    }
466
467    @Override
468    public void releaseLock(final MasterProcedureEnv env) {
469      env.getProcedureScheduler().wakeNamespaceExclusiveLock(
470          LockProcedure.this, namespace);
471    }
472  }
473
474  private class RegionExclusiveLock implements LockInterface {
475    @Override
476    public boolean acquireLock(final MasterProcedureEnv env) {
477      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
478      // to get the lock and false if you don't; i.e. you got the lock.
479      return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos);
480    }
481
482    @Override
483    public void releaseLock(final MasterProcedureEnv env) {
484      env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos);
485    }
486  }
487}