001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.Iterator;
023import java.util.Map;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.IdLock;
028import org.apache.hadoop.hbase.util.NonceKey;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Internal cleaner that removes the completed procedure results after a TTL.
035 * <p/>
036 * NOTE: This is a special case handled in timeoutLoop().
037 * <p/>
038 * Since the client code looks more or less like:
039 *
040 * <pre>
041 *   procId = master.doOperation()
042 *   while (master.getProcResult(procId) == ProcInProgress);
043 * </pre>
044 *
045 * The master should not throw away the proc result as soon as the procedure is done but should wait
046 * a result request from the client (see executor.removeResult(procId)) The client will call
047 * something like master.isProcDone() or master.getProcResult() which will return the result/state
048 * to the client, and it will mark the completed proc as ready to delete. note that the client may
049 * not receive the response from the master (e.g. master failover) so, if we delay a bit the real
050 * deletion of the proc result the client will be able to get the result the next try.
051 */
052@InterfaceAudience.Private
053class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> {
054  private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
055
056  static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
057  private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
058
059  private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
060  private static final int DEFAULT_BATCH_SIZE = 32;
061
062  private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
063  private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
064  private final ProcedureStore store;
065  private final IdLock procExecutionLock;
066  private Configuration conf;
067
068  public CompletedProcedureCleaner(Configuration conf, ProcedureStore store,
069    IdLock procExecutionLock, Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
070    Map<NonceKey, Long> nonceKeysToProcIdsMap) {
071    // set the timeout interval that triggers the periodic-procedure
072    super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
073    this.completed = completedMap;
074    this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
075    this.store = store;
076    this.procExecutionLock = procExecutionLock;
077    this.conf = conf;
078  }
079
080  @Override
081  protected void periodicExecute(final TEnvironment env) {
082    if (completed.isEmpty()) {
083      if (LOG.isTraceEnabled()) {
084        LOG.trace("No completed procedures to cleanup.");
085      }
086      return;
087    }
088
089    final long evictTtl =
090      conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
091    final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY,
092      ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
093    final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
094
095    final long[] batchIds = new long[batchSize];
096    int batchCount = 0;
097
098    final long now = EnvironmentEdgeManager.currentTime();
099    final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
100      completed.entrySet().iterator();
101    while (it.hasNext() && store.isRunning()) {
102      final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
103      final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
104      final Procedure<?> proc = retainer.getProcedure();
105      IdLock.Entry lockEntry;
106      try {
107        lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
108      } catch (IOException e) {
109        // can only happen if interrupted, so not a big deal to propagate it
110        throw new UncheckedIOException(e);
111      }
112      try {
113        // TODO: Select TTL based on Procedure type
114        if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
115          // Failed procedures aren't persisted in WAL.
116          if (!(proc instanceof FailedProcedure)) {
117            batchIds[batchCount++] = entry.getKey();
118            if (batchCount == batchIds.length) {
119              store.delete(batchIds, 0, batchCount);
120              batchCount = 0;
121            }
122          }
123          final NonceKey nonceKey = proc.getNonceKey();
124          if (nonceKey != null) {
125            nonceKeysToProcIdsMap.remove(nonceKey);
126          }
127          it.remove();
128          LOG.trace("Evict completed {}", proc);
129        }
130      } finally {
131        procExecutionLock.releaseLockEntry(lockEntry);
132      }
133    }
134    if (batchCount > 0) {
135      store.delete(batchIds, 0, batchCount);
136    }
137    // let the store do some cleanup works, i.e, delete the place marker for preserving the max
138    // procedure id.
139    store.cleanup();
140  }
141}