001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.concurrent.DelayQueue; 021import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 022import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * Runs task on a period such as check for stuck workers. 029 * @see InlineChore 030 */ 031@InterfaceAudience.Private 032class TimeoutExecutorThread<TEnvironment> extends StoppableThread { 033 034 private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); 035 036 private final ProcedureExecutor<TEnvironment> executor; 037 038 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); 039 040 public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) { 041 super(group, "ProcExecTimeout"); 042 setDaemon(true); 043 this.executor = executor; 044 } 045 046 @Override 047 public void sendStopSignal() { 048 queue.add(DelayedUtil.DELAYED_POISON); 049 } 050 051 @Override 052 public void run() { 053 while (executor.isRunning()) { 054 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); 055 if (task == null || task == DelayedUtil.DELAYED_POISON) { 056 // the executor may be shutting down, 057 // and the task is just the shutdown request 058 continue; 059 } 060 LOG.trace("Executing {}", task); 061 062 // execute the task 063 if (task instanceof InlineChore) { 064 execInlineChore((InlineChore) task); 065 } else if (task instanceof DelayedProcedure) { 066 execDelayedProcedure((DelayedProcedure<TEnvironment>) task); 067 } else { 068 LOG.error("CODE-BUG unknown timeout task type {}", task); 069 } 070 } 071 } 072 073 public void add(InlineChore chore) { 074 chore.refreshTimeout(); 075 queue.add(chore); 076 } 077 078 public void add(Procedure<TEnvironment> procedure) { 079 LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), 080 procedure.getTimeoutTimestamp()); 081 queue.add(new DelayedProcedure<>(procedure)); 082 } 083 084 public boolean remove(Procedure<TEnvironment> procedure) { 085 return queue.remove(new DelayedProcedure<>(procedure)); 086 } 087 088 private void execInlineChore(InlineChore chore) { 089 chore.run(); 090 add(chore); 091 } 092 093 private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) { 094 // TODO: treat this as a normal procedure, add it to the scheduler and 095 // let one of the workers handle it. 096 // Today we consider ProcedureInMemoryChore as InlineChores 097 Procedure<TEnvironment> procedure = delayed.getObject(); 098 if (procedure instanceof ProcedureInMemoryChore) { 099 executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure); 100 // if the procedure is in a waiting state again, put it back in the queue 101 procedure.updateTimestamp(); 102 if (procedure.isWaiting()) { 103 delayed.setTimeout(procedure.getTimeoutTimestamp()); 104 queue.add(delayed); 105 } 106 } else { 107 executeTimedoutProcedure(procedure); 108 } 109 } 110 111 private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) { 112 if (!chore.isWaiting()) { 113 return; 114 } 115 116 // The ProcedureInMemoryChore is a special case, and it acts as a chore. 117 // instead of bringing the Chore class in, we reuse this timeout thread for 118 // this special case. 119 try { 120 chore.periodicExecute(executor.getEnvironment()); 121 } catch (Throwable e) { 122 LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e); 123 } 124 } 125 126 protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) { 127 // The procedure received a timeout. if the procedure itself does not handle it, 128 // call abort() and add the procedure back in the queue for rollback. 129 if (proc.setTimeoutFailure(executor.getEnvironment())) { 130 long rootProcId = executor.getRootProcedureId(proc); 131 RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId); 132 procStack.abort(); 133 executor.getStore().update(proc); 134 executor.getScheduler().addFront(proc); 135 } 136 } 137}