001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.locking;
019
020import java.io.IOException;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
029import org.apache.hadoop.hbase.procedure2.LockType;
030import org.apache.hadoop.hbase.procedure2.Procedure;
031import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
043
044/**
045 * Procedure to allow blessed clients and external admin tools to take our internal Schema locks
046 * used by the procedure framework isolating procedures doing creates/deletes etc. on
047 * table/namespace/regions. This procedure when scheduled, acquires specified locks, suspends itself
048 * and waits for:
049 * <ul>
050 * <li>Call to unlock: if lock request came from the process itself, say master chore.</li>
051 * <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding
052 * the lock or not based on last heartbeat timestamp.</li>
053 * </ul>
054 */
055@InterfaceAudience.Private
056public final class LockProcedure extends Procedure<MasterProcedureEnv>
057  implements TableProcedureInterface {
058  private static final Logger LOG = LoggerFactory.getLogger(LockProcedure.class);
059
060  public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000; // timeout in ms
061  public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF =
062    "hbase.master.procedure.remote.locks.timeout.ms";
063  // 10 min. Same as old ZK lock timeout.
064  public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000;
065  public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF =
066    "hbase.master.procedure.local.master.locks.timeout.ms";
067
068  private String namespace;
069  private TableName tableName;
070  private RegionInfo[] regionInfos;
071  private LockType type;
072  // underlying namespace/table/region lock.
073  private LockInterface lock;
074  private TableOperationType opType;
075  private String description;
076  // True when recovery of master lock from WALs
077  private boolean recoveredMasterLock;
078
079  private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this);
080  // True if this proc acquired relevant locks. This value is for client checks.
081  private final AtomicBoolean locked = new AtomicBoolean(false);
082  // Last system time (in ms) when client sent the heartbeat.
083  // Initialize to system time for non-null value in case of recovery.
084  private final AtomicLong lastHeartBeat = new AtomicLong();
085  // Set to true when unlock request is received.
086  private final AtomicBoolean unlock = new AtomicBoolean(false);
087  // decreased when locks are acquired. Only used for local (with master process) purposes.
088  // Setting latch to non-null value increases default timeout to
089  // DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat.
090  private final CountDownLatch lockAcquireLatch;
091
092  private volatile boolean suspended = false;
093
094  @Override
095  public TableName getTableName() {
096    return tableName;
097  }
098
099  @Override
100  public TableOperationType getTableOperationType() {
101    return opType;
102  }
103
104  private interface LockInterface {
105    boolean acquireLock(MasterProcedureEnv env);
106
107    void releaseLock(MasterProcedureEnv env);
108  }
109
110  public LockProcedure() {
111    lockAcquireLatch = null;
112  }
113
114  private LockProcedure(final Configuration conf, final LockType type, final String description,
115    final CountDownLatch lockAcquireLatch) {
116    this.type = type;
117    this.description = description;
118    this.lockAcquireLatch = lockAcquireLatch;
119    if (lockAcquireLatch == null) {
120      setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS));
121    } else {
122      setTimeout(
123        conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS));
124    }
125  }
126
127  /**
128   * Constructor for namespace lock.
129   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
130   */
131  public LockProcedure(final Configuration conf, final String namespace, final LockType type,
132    final String description, final CountDownLatch lockAcquireLatch)
133    throws IllegalArgumentException {
134    this(conf, type, description, lockAcquireLatch);
135
136    if (namespace.isEmpty()) {
137      throw new IllegalArgumentException("Empty namespace");
138    }
139
140    this.namespace = namespace;
141    this.lock = setupNamespaceLock();
142  }
143
144  /**
145   * Constructor for table lock.
146   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
147   */
148  public LockProcedure(final Configuration conf, final TableName tableName, final LockType type,
149    final String description, final CountDownLatch lockAcquireLatch)
150    throws IllegalArgumentException {
151    this(conf, type, description, lockAcquireLatch);
152
153    this.tableName = tableName;
154    this.lock = setupTableLock();
155  }
156
157  /**
158   * Constructor for region lock(s).
159   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired. Useful for
160   *                         locks acquired locally from master process.
161   * @throws IllegalArgumentException if all regions are not from same table.
162   */
163  public LockProcedure(final Configuration conf, final RegionInfo[] regionInfos,
164    final LockType type, final String description, final CountDownLatch lockAcquireLatch)
165    throws IllegalArgumentException {
166    this(conf, type, description, lockAcquireLatch);
167
168    // Build RegionInfo from region names.
169    if (regionInfos.length == 0) {
170      throw new IllegalArgumentException("No regions specified for region lock");
171    }
172
173    // check all regions belong to same table.
174    final TableName regionTable = regionInfos[0].getTable();
175    for (int i = 1; i < regionInfos.length; ++i) {
176      if (!regionInfos[i].getTable().equals(regionTable)) {
177        throw new IllegalArgumentException("All regions should be from same table");
178      }
179    }
180
181    this.regionInfos = regionInfos;
182    this.lock = setupRegionLock();
183  }
184
185  private boolean hasHeartbeatExpired() {
186    return EnvironmentEdgeManager.currentTime() - lastHeartBeat.get() >= getTimeout();
187  }
188
189  /**
190   * Updates timeout deadline for the lock.
191   */
192  public void updateHeartBeat() {
193    lastHeartBeat.set(EnvironmentEdgeManager.currentTime());
194    if (LOG.isDebugEnabled()) {
195      LOG.debug("Heartbeat " + toString());
196    }
197  }
198
199  /**
200   * Re run the procedure after every timeout to write new WAL entries so we don't hold back old
201   * WALs.
202   * @return false, so procedure framework doesn't mark this procedure as failure.
203   */
204  @Override
205  protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) {
206    synchronized (event) {
207      if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event);
208      if (!event.isReady()) { // Maybe unlock() awakened the event.
209        setState(ProcedureProtos.ProcedureState.RUNNABLE);
210        if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
211        event.wake(env.getProcedureScheduler());
212      }
213    }
214    return false; // false: do not mark the procedure as failed.
215  }
216
217  // Can be called before procedure gets scheduled, in which case, the execute() will finish
218  // immediately and release the underlying locks.
219  public void unlock(final MasterProcedureEnv env) {
220    unlock.set(true);
221    locked.set(false);
222    // Maybe timeout already awakened the event and the procedure has finished.
223    synchronized (event) {
224      if (!event.isReady() && suspended) {
225        setState(ProcedureProtos.ProcedureState.RUNNABLE);
226        event.wake(env.getProcedureScheduler());
227        suspended = false;
228      }
229    }
230  }
231
232  @Override
233  protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env)
234    throws ProcedureSuspendedException {
235    // Local master locks don't store any state, so on recovery, simply finish this procedure
236    // immediately.
237    if (recoveredMasterLock) return null;
238    if (lockAcquireLatch != null) {
239      lockAcquireLatch.countDown();
240    }
241    if (unlock.get() || hasHeartbeatExpired()) {
242      locked.set(false);
243      LOG.debug((unlock.get() ? "UNLOCKED " : "TIMED OUT ") + toString());
244      return null;
245    }
246    synchronized (event) {
247      event.suspend();
248      event.suspendIfNotReady(this);
249      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
250      suspended = true;
251    }
252    throw new ProcedureSuspendedException();
253  }
254
255  @Override
256  protected void rollback(final MasterProcedureEnv env) {
257    throw new UnsupportedOperationException();
258  }
259
260  @Override
261  protected boolean abort(final MasterProcedureEnv env) {
262    unlock(env);
263    return true;
264  }
265
266  @Override
267  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
268    final LockProcedureData.Builder builder = LockProcedureData.newBuilder()
269      .setLockType(LockServiceProtos.LockType.valueOf(type.name())).setDescription(description);
270    if (regionInfos != null) {
271      for (int i = 0; i < regionInfos.length; ++i) {
272        builder.addRegionInfo(ProtobufUtil.toRegionInfo(regionInfos[i]));
273      }
274    } else if (namespace != null) {
275      builder.setNamespace(namespace);
276    } else if (tableName != null) {
277      builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
278    }
279    if (lockAcquireLatch != null) {
280      builder.setIsMasterLock(true);
281    }
282    serializer.serialize(builder.build());
283  }
284
285  @Override
286  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
287    final LockProcedureData state = serializer.deserialize(LockProcedureData.class);
288    type = LockType.valueOf(state.getLockType().name());
289    description = state.getDescription();
290    if (state.getRegionInfoCount() > 0) {
291      regionInfos = new RegionInfo[state.getRegionInfoCount()];
292      for (int i = 0; i < state.getRegionInfoCount(); ++i) {
293        regionInfos[i] = ProtobufUtil.toRegionInfo(state.getRegionInfo(i));
294      }
295    } else if (state.hasNamespace()) {
296      namespace = state.getNamespace();
297    } else if (state.hasTableName()) {
298      tableName = ProtobufUtil.toTableName(state.getTableName());
299    }
300    recoveredMasterLock = state.getIsMasterLock();
301    this.lock = setupLock();
302  }
303
304  @Override
305  protected LockState acquireLock(final MasterProcedureEnv env) {
306    boolean ret = lock.acquireLock(env);
307    locked.set(ret);
308    if (ret) {
309      if (LOG.isDebugEnabled()) {
310        LOG.debug("LOCKED " + toString());
311      }
312      lastHeartBeat.set(EnvironmentEdgeManager.currentTime());
313      return LockState.LOCK_ACQUIRED;
314    }
315    LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING");
316    return LockState.LOCK_EVENT_WAIT;
317  }
318
319  @Override
320  protected void releaseLock(final MasterProcedureEnv env) {
321    lock.releaseLock(env);
322  }
323
324  /**
325   * On recovery, re-execute from start to acquire the locks. Need to explicitly set it to RUNNABLE
326   * because the procedure might have been in WAITING_TIMEOUT state when crash happened. In which
327   * case, it'll be sent back to timeout queue on recovery, which we don't want since we want to
328   * require locks.
329   */
330  @Override
331  protected void beforeReplay(MasterProcedureEnv env) {
332    setState(ProcedureProtos.ProcedureState.RUNNABLE);
333  }
334
335  @Override
336  protected void toStringClassDetails(final StringBuilder builder) {
337    super.toStringClassDetails(builder);
338    if (regionInfos != null) {
339      builder.append(" regions=");
340      for (int i = 0; i < regionInfos.length; ++i) {
341        if (i > 0) builder.append(",");
342        builder.append(regionInfos[i].getShortNameToLog());
343      }
344    } else if (namespace != null) {
345      builder.append(", namespace=").append(namespace);
346    } else if (tableName != null) {
347      builder.append(", tableName=").append(tableName);
348    }
349    builder.append(", type=").append(type);
350  }
351
352  public LockType getType() {
353    return type;
354  }
355
356  private LockInterface setupLock() throws IllegalArgumentException {
357    if (regionInfos != null) {
358      return setupRegionLock();
359    } else if (namespace != null) {
360      return setupNamespaceLock();
361    } else if (tableName != null) {
362      return setupTableLock();
363    } else {
364      LOG.error("Unknown level specified in " + toString());
365      throw new IllegalArgumentException("no namespace/table/region provided");
366    }
367  }
368
369  private LockInterface setupNamespaceLock() throws IllegalArgumentException {
370    this.tableName = TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME;
371    switch (type) {
372      case EXCLUSIVE:
373        this.opType = TableOperationType.EDIT;
374        return new NamespaceExclusiveLock();
375      case SHARED:
376        LOG.error("Shared lock on namespace not supported for " + toString());
377        throw new IllegalArgumentException("Shared lock on namespace not supported");
378      default:
379        LOG.error("Unexpected lock type " + toString());
380        throw new IllegalArgumentException("Wrong lock type: " + type.toString());
381    }
382  }
383
384  private LockInterface setupTableLock() throws IllegalArgumentException {
385    switch (type) {
386      case EXCLUSIVE:
387        this.opType = TableOperationType.EDIT;
388        return new TableExclusiveLock();
389      case SHARED:
390        this.opType = TableOperationType.READ;
391        return new TableSharedLock();
392      default:
393        LOG.error("Unexpected lock type " + toString());
394        throw new IllegalArgumentException("Wrong lock type:" + type.toString());
395    }
396  }
397
398  private LockInterface setupRegionLock() throws IllegalArgumentException {
399    this.tableName = regionInfos[0].getTable();
400    switch (type) {
401      case EXCLUSIVE:
402        this.opType = TableOperationType.REGION_EDIT;
403        return new RegionExclusiveLock();
404      default:
405        LOG.error("Only exclusive lock supported on regions for " + toString());
406        throw new IllegalArgumentException("Only exclusive lock supported on regions.");
407    }
408  }
409
410  public String getDescription() {
411    return description;
412  }
413
414  public boolean isLocked() {
415    return locked.get();
416  }
417
418  @Override
419  public boolean holdLock(final MasterProcedureEnv env) {
420    return true;
421  }
422
423  ///////////////////////
424  // LOCK IMPLEMENTATIONS
425  ///////////////////////
426
427  private class TableExclusiveLock implements LockInterface {
428    @Override
429    public boolean acquireLock(final MasterProcedureEnv env) {
430      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
431      // to get the lock and false if you don't; i.e. you got the lock.
432      return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName);
433    }
434
435    @Override
436    public void releaseLock(final MasterProcedureEnv env) {
437      env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName);
438    }
439  }
440
441  private class TableSharedLock implements LockInterface {
442    @Override
443    public boolean acquireLock(final MasterProcedureEnv env) {
444      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
445      // to get the lock and false if you don't; i.e. you got the lock.
446      return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName);
447    }
448
449    @Override
450    public void releaseLock(final MasterProcedureEnv env) {
451      env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName);
452    }
453  }
454
455  private class NamespaceExclusiveLock implements LockInterface {
456    @Override
457    public boolean acquireLock(final MasterProcedureEnv env) {
458      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
459      // to get the lock and false if you don't; i.e. you got the lock.
460      return !env.getProcedureScheduler().waitNamespaceExclusiveLock(LockProcedure.this, namespace);
461    }
462
463    @Override
464    public void releaseLock(final MasterProcedureEnv env) {
465      env.getProcedureScheduler().wakeNamespaceExclusiveLock(LockProcedure.this, namespace);
466    }
467  }
468
469  private class RegionExclusiveLock implements LockInterface {
470    @Override
471    public boolean acquireLock(final MasterProcedureEnv env) {
472      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
473      // to get the lock and false if you don't; i.e. you got the lock.
474      return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos);
475    }
476
477    @Override
478    public void releaseLock(final MasterProcedureEnv env) {
479      env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos);
480    }
481  }
482}