001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.Iterator; 023import java.util.Map; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.IdLock; 028import org.apache.hadoop.hbase.util.NonceKey; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Internal cleaner that removes the completed procedure results after a TTL. 035 * <p/> 036 * NOTE: This is a special case handled in timeoutLoop(). 037 * <p/> 038 * Since the client code looks more or less like: 039 * 040 * <pre> 041 * procId = master.doOperation() 042 * while (master.getProcResult(procId) == ProcInProgress); 043 * </pre> 044 * 045 * The master should not throw away the proc result as soon as the procedure is done but should wait 046 * a result request from the client (see executor.removeResult(procId)) The client will call 047 * something like master.isProcDone() or master.getProcResult() which will return the result/state 048 * to the client, and it will mark the completed proc as ready to delete. note that the client may 049 * not receive the response from the master (e.g. master failover) so, if we delay a bit the real 050 * deletion of the proc result the client will be able to get the result the next try. 051 */ 052@InterfaceAudience.Private 053class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> { 054 private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class); 055 056 static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval"; 057 private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec 058 059 private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size"; 060 private static final int DEFAULT_BATCH_SIZE = 32; 061 062 private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed; 063 private final Map<NonceKey, Long> nonceKeysToProcIdsMap; 064 private final ProcedureStore store; 065 private final IdLock procExecutionLock; 066 private Configuration conf; 067 068 public CompletedProcedureCleaner(Configuration conf, ProcedureStore store, 069 IdLock procExecutionLock, Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap, 070 Map<NonceKey, Long> nonceKeysToProcIdsMap) { 071 // set the timeout interval that triggers the periodic-procedure 072 super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); 073 this.completed = completedMap; 074 this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap; 075 this.store = store; 076 this.procExecutionLock = procExecutionLock; 077 this.conf = conf; 078 } 079 080 @Override 081 protected void periodicExecute(final TEnvironment env) { 082 if (completed.isEmpty()) { 083 if (LOG.isTraceEnabled()) { 084 LOG.trace("No completed procedures to cleanup."); 085 } 086 return; 087 } 088 089 final long evictTtl = 090 conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL); 091 final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY, 092 ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL); 093 final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); 094 095 final long[] batchIds = new long[batchSize]; 096 int batchCount = 0; 097 098 final long now = EnvironmentEdgeManager.currentTime(); 099 final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it = 100 completed.entrySet().iterator(); 101 while (it.hasNext() && store.isRunning()) { 102 final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next(); 103 final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue(); 104 final Procedure<?> proc = retainer.getProcedure(); 105 IdLock.Entry lockEntry; 106 try { 107 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 108 } catch (IOException e) { 109 // can only happen if interrupted, so not a big deal to propagate it 110 throw new UncheckedIOException(e); 111 } 112 try { 113 // TODO: Select TTL based on Procedure type 114 if (retainer.isExpired(now, evictTtl, evictAckTtl)) { 115 // Failed procedures aren't persisted in WAL. 116 if (!(proc instanceof FailedProcedure)) { 117 batchIds[batchCount++] = entry.getKey(); 118 if (batchCount == batchIds.length) { 119 store.delete(batchIds, 0, batchCount); 120 batchCount = 0; 121 } 122 } 123 final NonceKey nonceKey = proc.getNonceKey(); 124 if (nonceKey != null) { 125 nonceKeysToProcIdsMap.remove(nonceKey); 126 } 127 it.remove(); 128 LOG.trace("Evict completed {}", proc); 129 } 130 } finally { 131 procExecutionLock.releaseLockEntry(lockEntry); 132 } 133 } 134 if (batchCount > 0) { 135 store.delete(batchIds, 0, batchCount); 136 } 137 // let the store do some cleanup works, i.e, delete the place marker for preserving the max 138 // procedure id. 139 store.cleanup(); 140 } 141}