001/**
002
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master.locking;
021
022import java.io.IOException;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
032import org.apache.hadoop.hbase.procedure2.LockType;
033import org.apache.hadoop.hbase.procedure2.Procedure;
034import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
035import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
044
045/**
046 * Procedure to allow blessed clients and external admin tools to take our internal Schema locks
047 * used by the procedure framework isolating procedures doing creates/deletes etc. on
048 * table/namespace/regions.
049 * This procedure when scheduled, acquires specified locks, suspends itself and waits for:
050 * <ul>
051 * <li>Call to unlock: if lock request came from the process itself, say master chore.</li>
052 * <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding
053 * the lock or not based on last heartbeat timestamp.</li>
054 * </ul>
055 */
056@InterfaceAudience.Private
057public final class LockProcedure extends Procedure<MasterProcedureEnv>
058    implements TableProcedureInterface {
059  private static final Logger LOG = LoggerFactory.getLogger(LockProcedure.class);
060
061  public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000;  // timeout in ms
062  public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF =
063      "hbase.master.procedure.remote.locks.timeout.ms";
064  // 10 min. Same as old ZK lock timeout.
065  public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000;
066  public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF =
067      "hbase.master.procedure.local.master.locks.timeout.ms";
068
069  private String namespace;
070  private TableName tableName;
071  private RegionInfo[] regionInfos;
072  private LockType type;
073  // underlying namespace/table/region lock.
074  private LockInterface lock;
075  private TableOperationType opType;
076  private String description;
077  // True when recovery of master lock from WALs
078  private boolean recoveredMasterLock;
079
080  private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this);
081  // True if this proc acquired relevant locks. This value is for client checks.
082  private final AtomicBoolean locked = new AtomicBoolean(false);
083  // Last system time (in ms) when client sent the heartbeat.
084  // Initialize to system time for non-null value in case of recovery.
085  private final AtomicLong lastHeartBeat = new AtomicLong();
086  // Set to true when unlock request is received.
087  private final AtomicBoolean unlock = new AtomicBoolean(false);
088  // decreased when locks are acquired. Only used for local (with master process) purposes.
089  // Setting latch to non-null value increases default timeout to
090  // DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat.
091  private final CountDownLatch lockAcquireLatch;
092
093  @Override
094  public TableName getTableName() {
095    return tableName;
096  }
097
098  @Override
099  public TableOperationType getTableOperationType() {
100    return opType;
101  }
102
103  private interface LockInterface {
104    boolean acquireLock(MasterProcedureEnv env);
105    void releaseLock(MasterProcedureEnv env);
106  }
107
108  public LockProcedure() {
109    lockAcquireLatch = null;
110  }
111
112  private LockProcedure(final Configuration conf, final LockType type,
113      final String description, final CountDownLatch lockAcquireLatch) {
114    this.type = type;
115    this.description = description;
116    this.lockAcquireLatch = lockAcquireLatch;
117    if (lockAcquireLatch == null) {
118      setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS));
119    } else {
120      setTimeout(conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF,
121          DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS));
122    }
123  }
124
125  /**
126   * Constructor for namespace lock.
127   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
128   */
129  public LockProcedure(final Configuration conf, final String namespace, final LockType type,
130      final String description, final CountDownLatch lockAcquireLatch)
131      throws IllegalArgumentException {
132    this(conf, type, description, lockAcquireLatch);
133
134    if (namespace.isEmpty()) {
135      throw new IllegalArgumentException("Empty namespace");
136    }
137
138    this.namespace = namespace;
139    this.lock = setupNamespaceLock();
140  }
141
142  /**
143   * Constructor for table lock.
144   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
145   */
146  public LockProcedure(final Configuration conf, final TableName tableName, final LockType type,
147      final String description, final CountDownLatch lockAcquireLatch)
148      throws IllegalArgumentException {
149    this(conf, type, description, lockAcquireLatch);
150
151    this.tableName = tableName;
152    this.lock = setupTableLock();
153  }
154
155  /**
156   * Constructor for region lock(s).
157   * @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
158   *                        Useful for locks acquired locally from master process.
159   * @throws IllegalArgumentException if all regions are not from same table.
160   */
161  public LockProcedure(final Configuration conf, final RegionInfo[] regionInfos,
162      final LockType type, final String description, final CountDownLatch lockAcquireLatch)
163      throws IllegalArgumentException {
164    this(conf, type, description, lockAcquireLatch);
165
166    // Build RegionInfo from region names.
167    if (regionInfos.length == 0) {
168      throw new IllegalArgumentException("No regions specified for region lock");
169    }
170
171    // check all regions belong to same table.
172    final TableName regionTable = regionInfos[0].getTable();
173    for (int i = 1; i < regionInfos.length; ++i) {
174      if (!regionInfos[i].getTable().equals(regionTable)) {
175        throw new IllegalArgumentException("All regions should be from same table");
176      }
177    }
178
179    this.regionInfos = regionInfos;
180    this.lock = setupRegionLock();
181  }
182
183  private boolean hasHeartbeatExpired() {
184    return System.currentTimeMillis() - lastHeartBeat.get() >= getTimeout();
185  }
186
187  /**
188   * Updates timeout deadline for the lock.
189   */
190  public void updateHeartBeat() {
191    lastHeartBeat.set(System.currentTimeMillis());
192    if (LOG.isDebugEnabled()) {
193      LOG.debug("Heartbeat " + toString());
194    }
195  }
196
197  /**
198   * Re run the procedure after every timeout to write new WAL entries so we don't hold back old
199   * WALs.
200   * @return false, so procedure framework doesn't mark this procedure as failure.
201   */
202  @Override
203  protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) {
204    synchronized (event) {
205      if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event);
206      if (!event.isReady()) {  // Maybe unlock() awakened the event.
207        setState(ProcedureProtos.ProcedureState.RUNNABLE);
208        if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
209        event.wake(env.getProcedureScheduler());
210      }
211    }
212    return false;  // false: do not mark the procedure as failed.
213  }
214
215  // Can be called before procedure gets scheduled, in which case, the execute() will finish
216  // immediately and release the underlying locks.
217  public void unlock(final MasterProcedureEnv env) {
218    unlock.set(true);
219    locked.set(false);
220    // Maybe timeout already awakened the event and the procedure has finished.
221    synchronized (event) {
222      if (!event.isReady()) {
223        setState(ProcedureProtos.ProcedureState.RUNNABLE);
224        event.wake(env.getProcedureScheduler());
225      }
226    }
227  }
228
229  @Override
230  protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env)
231  throws ProcedureSuspendedException {
232    // Local master locks don't store any state, so on recovery, simply finish this procedure
233    // immediately.
234    if (recoveredMasterLock) return null;
235    if (lockAcquireLatch != null) {
236      lockAcquireLatch.countDown();
237    }
238    if (unlock.get() || hasHeartbeatExpired()) {
239      locked.set(false);
240      LOG.debug((unlock.get()? "UNLOCKED " : "TIMED OUT ") + toString());
241      return null;
242    }
243    synchronized (event) {
244      event.suspend();
245      event.suspendIfNotReady(this);
246      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
247    }
248    throw new ProcedureSuspendedException();
249  }
250
251  @Override
252  protected void rollback(final MasterProcedureEnv env) {
253    throw new UnsupportedOperationException();
254  }
255
256  @Override
257  protected boolean abort(final MasterProcedureEnv env) {
258    unlock(env);
259    return true;
260  }
261
262  @Override
263  protected void serializeStateData(ProcedureStateSerializer serializer)
264      throws IOException {
265    final LockProcedureData.Builder builder = LockProcedureData.newBuilder()
266          .setLockType(LockServiceProtos.LockType.valueOf(type.name()))
267          .setDescription(description);
268    if (regionInfos != null) {
269      for (int i = 0; i < regionInfos.length; ++i) {
270        builder.addRegionInfo(ProtobufUtil.toRegionInfo(regionInfos[i]));
271      }
272    } else if (namespace != null) {
273      builder.setNamespace(namespace);
274    } else if (tableName != null) {
275      builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
276    }
277    if (lockAcquireLatch != null) {
278      builder.setIsMasterLock(true);
279    }
280    serializer.serialize(builder.build());
281  }
282
283  @Override
284  protected void deserializeStateData(ProcedureStateSerializer serializer)
285      throws IOException {
286    final LockProcedureData state = serializer.deserialize(LockProcedureData.class);
287    type = LockType.valueOf(state.getLockType().name());
288    description = state.getDescription();
289    if (state.getRegionInfoCount() > 0) {
290      regionInfos = new RegionInfo[state.getRegionInfoCount()];
291      for (int i = 0; i < state.getRegionInfoCount(); ++i) {
292        regionInfos[i] = ProtobufUtil.toRegionInfo(state.getRegionInfo(i));
293      }
294    } else if (state.hasNamespace()) {
295      namespace = state.getNamespace();
296    } else if (state.hasTableName()) {
297      tableName = ProtobufUtil.toTableName(state.getTableName());
298    }
299    recoveredMasterLock = state.getIsMasterLock();
300    this.lock = setupLock();
301  }
302
303  @Override
304  protected LockState acquireLock(final MasterProcedureEnv env) {
305    boolean ret = lock.acquireLock(env);
306    locked.set(ret);
307    if (ret) {
308      if (LOG.isDebugEnabled()) {
309        LOG.debug("LOCKED " + toString());
310      }
311      lastHeartBeat.set(System.currentTimeMillis());
312      return LockState.LOCK_ACQUIRED;
313    }
314    LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING");
315    return LockState.LOCK_EVENT_WAIT;
316  }
317
318  @Override
319  protected void releaseLock(final MasterProcedureEnv env) {
320    lock.releaseLock(env);
321  }
322
323  /**
324   * On recovery, re-execute from start to acquire the locks.
325   * Need to explicitly set it to RUNNABLE because the procedure might have been in WAITING_TIMEOUT
326   * state when crash happened. In which case, it'll be sent back to timeout queue on recovery,
327   * which we don't want since we want to require locks.
328   */
329  @Override
330  protected void beforeReplay(MasterProcedureEnv env) {
331    setState(ProcedureProtos.ProcedureState.RUNNABLE);
332  }
333
334  @Override
335  protected void toStringClassDetails(final StringBuilder builder) {
336    super.toStringClassDetails(builder);
337    if (regionInfos != null) {
338      builder.append(" regions=");
339      for (int i = 0; i < regionInfos.length; ++i) {
340        if (i > 0) builder.append(",");
341        builder.append(regionInfos[i].getShortNameToLog());
342      }
343    } else if (namespace != null) {
344      builder.append(", namespace=").append(namespace);
345    } else if (tableName != null) {
346      builder.append(", tableName=").append(tableName);
347    }
348    builder.append(", type=").append(type);
349  }
350
351  public LockType getType() {
352    return type;
353  }
354
355  private LockInterface setupLock() throws IllegalArgumentException {
356    if (regionInfos != null) {
357      return setupRegionLock();
358    } else if (namespace != null) {
359      return setupNamespaceLock();
360    } else if (tableName != null) {
361      return setupTableLock();
362    } else {
363      LOG.error("Unknown level specified in " + toString());
364      throw new IllegalArgumentException("no namespace/table/region provided");
365    }
366  }
367
368  private LockInterface setupNamespaceLock() throws IllegalArgumentException {
369    this.tableName = TableName.NAMESPACE_TABLE_NAME;
370    switch (type) {
371      case EXCLUSIVE:
372        this.opType = TableOperationType.EDIT;
373        return new NamespaceExclusiveLock();
374      case SHARED:
375        LOG.error("Shared lock on namespace not supported for " + toString());
376        throw new IllegalArgumentException("Shared lock on namespace not supported");
377      default:
378        LOG.error("Unexpected lock type " + toString());
379        throw new IllegalArgumentException("Wrong lock type: " + type.toString());
380    }
381  }
382
383  private LockInterface setupTableLock() throws IllegalArgumentException {
384    switch (type) {
385      case EXCLUSIVE:
386        this.opType = TableOperationType.EDIT;
387        return new TableExclusiveLock();
388      case SHARED:
389        this.opType = TableOperationType.READ;
390        return new TableSharedLock();
391      default:
392        LOG.error("Unexpected lock type " + toString());
393        throw new IllegalArgumentException("Wrong lock type:" + type.toString());
394    }
395  }
396
397  private LockInterface setupRegionLock() throws IllegalArgumentException {
398    this.tableName = regionInfos[0].getTable();
399    switch (type) {
400      case EXCLUSIVE:
401        this.opType = TableOperationType.REGION_EDIT;
402        return new RegionExclusiveLock();
403      default:
404        LOG.error("Only exclusive lock supported on regions for " + toString());
405        throw new IllegalArgumentException("Only exclusive lock supported on regions.");
406    }
407  }
408
409  public String getDescription() {
410    return description;
411  }
412
413  public boolean isLocked() {
414    return locked.get();
415  }
416
417  @Override
418  public boolean holdLock(final MasterProcedureEnv env) {
419    return true;
420  }
421
422  ///////////////////////
423  // LOCK IMPLEMENTATIONS
424  ///////////////////////
425
426  private class TableExclusiveLock implements LockInterface {
427    @Override
428    public boolean acquireLock(final MasterProcedureEnv env) {
429      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
430      // to get the lock and false if you don't; i.e. you got the lock.
431      return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName);
432    }
433
434    @Override
435    public void releaseLock(final MasterProcedureEnv env) {
436      env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName);
437    }
438  }
439
440  private class TableSharedLock implements LockInterface {
441    @Override
442    public boolean acquireLock(final MasterProcedureEnv env) {
443      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
444      // to get the lock and false if you don't; i.e. you got the lock.
445      return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName);
446    }
447
448    @Override
449    public void releaseLock(final MasterProcedureEnv env) {
450      env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName);
451    }
452  }
453
454  private class NamespaceExclusiveLock implements LockInterface {
455    @Override
456    public boolean acquireLock(final MasterProcedureEnv env) {
457      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
458      // to get the lock and false if you don't; i.e. you got the lock.
459      return !env.getProcedureScheduler().waitNamespaceExclusiveLock(
460          LockProcedure.this, namespace);
461    }
462
463    @Override
464    public void releaseLock(final MasterProcedureEnv env) {
465      env.getProcedureScheduler().wakeNamespaceExclusiveLock(
466          LockProcedure.this, namespace);
467    }
468  }
469
470  private class RegionExclusiveLock implements LockInterface {
471    @Override
472    public boolean acquireLock(final MasterProcedureEnv env) {
473      // We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
474      // to get the lock and false if you don't; i.e. you got the lock.
475      return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos);
476    }
477
478    @Override
479    public void releaseLock(final MasterProcedureEnv env) {
480      env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos);
481    }
482  }
483}