001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.concurrent.DelayQueue; 021import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 022import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * Runs task on a period such as check for stuck workers. 029 * @see InlineChore 030 */ 031@InterfaceAudience.Private 032class TimeoutExecutorThread<TEnvironment> extends StoppableThread { 033 034 private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); 035 036 private final ProcedureExecutor<TEnvironment> executor; 037 038 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); 039 040 public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group, 041 String name) { 042 super(group, name); 043 setDaemon(true); 044 this.executor = executor; 045 } 046 047 @Override 048 public void sendStopSignal() { 049 queue.add(DelayedUtil.DELAYED_POISON); 050 } 051 052 @Override 053 public void run() { 054 while (executor.isRunning()) { 055 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); 056 if (task == null || task == DelayedUtil.DELAYED_POISON) { 057 // the executor may be shutting down, 058 // and the task is just the shutdown request 059 continue; 060 } 061 LOG.trace("Executing {}", task); 062 063 // execute the task 064 if (task instanceof InlineChore) { 065 execInlineChore((InlineChore) task); 066 } else if (task instanceof DelayedProcedure) { 067 execDelayedProcedure((DelayedProcedure<TEnvironment>) task); 068 } else { 069 LOG.error("CODE-BUG unknown timeout task type {}", task); 070 } 071 } 072 } 073 074 public void add(InlineChore chore) { 075 chore.refreshTimeout(); 076 queue.add(chore); 077 } 078 079 public void add(Procedure<TEnvironment> procedure) { 080 LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), 081 procedure.getTimeoutTimestamp()); 082 queue.add(new DelayedProcedure<>(procedure)); 083 } 084 085 public boolean remove(Procedure<TEnvironment> procedure) { 086 return queue.remove(new DelayedProcedure<>(procedure)); 087 } 088 089 private void execInlineChore(InlineChore chore) { 090 chore.run(); 091 add(chore); 092 } 093 094 private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) { 095 // TODO: treat this as a normal procedure, add it to the scheduler and 096 // let one of the workers handle it. 097 // Today we consider ProcedureInMemoryChore as InlineChores 098 Procedure<TEnvironment> procedure = delayed.getObject(); 099 if (procedure instanceof ProcedureInMemoryChore) { 100 executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure); 101 // if the procedure is in a waiting state again, put it back in the queue 102 procedure.updateTimestamp(); 103 if (procedure.isWaiting()) { 104 delayed.setTimeout(procedure.getTimeoutTimestamp()); 105 queue.add(delayed); 106 } 107 } else { 108 executeTimedoutProcedure(procedure); 109 } 110 } 111 112 private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) { 113 if (!chore.isWaiting()) { 114 return; 115 } 116 117 // The ProcedureInMemoryChore is a special case, and it acts as a chore. 118 // instead of bringing the Chore class in, we reuse this timeout thread for 119 // this special case. 120 try { 121 chore.periodicExecute(executor.getEnvironment()); 122 } catch (Throwable e) { 123 LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e); 124 } 125 } 126 127 protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) { 128 // The procedure received a timeout. if the procedure itself does not handle it, 129 // call abort() and add the procedure back in the queue for rollback. 130 if (proc.setTimeoutFailure(executor.getEnvironment())) { 131 long rootProcId = executor.getRootProcedureId(proc); 132 RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId); 133 procStack.abort(); 134 executor.getStore().update(proc); 135 executor.getScheduler().addFront(proc); 136 } 137 } 138}