001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.concurrent.DelayQueue; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 023import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * Runs task on a period such as check for stuck workers. 030 * @see InlineChore 031 */ 032@InterfaceAudience.Private 033class TimeoutExecutorThread<TEnvironment> extends StoppableThread { 034 035 private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); 036 037 private final ProcedureExecutor<TEnvironment> executor; 038 039 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); 040 041 public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group, 042 String name) { 043 super(group, name); 044 setDaemon(true); 045 this.executor = executor; 046 } 047 048 @Override 049 public void sendStopSignal() { 050 queue.add(DelayedUtil.DELAYED_POISON); 051 } 052 053 @Override 054 public void run() { 055 while (executor.isRunning()) { 056 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20, TimeUnit.SECONDS); 057 if (task == null || task == DelayedUtil.DELAYED_POISON) { 058 // the executor may be shutting down, 059 // and the task is just the shutdown request 060 continue; 061 } 062 LOG.trace("Executing {}", task); 063 064 // execute the task 065 if (task instanceof InlineChore) { 066 execInlineChore((InlineChore) task); 067 } else if (task instanceof DelayedProcedure) { 068 execDelayedProcedure((DelayedProcedure<TEnvironment>) task); 069 } else { 070 LOG.error("CODE-BUG unknown timeout task type {}", task); 071 } 072 } 073 } 074 075 public void add(InlineChore chore) { 076 chore.refreshTimeout(); 077 queue.add(chore); 078 } 079 080 public void add(Procedure<TEnvironment> procedure) { 081 LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), 082 procedure.getTimeoutTimestamp()); 083 queue.add(new DelayedProcedure<>(procedure)); 084 } 085 086 public boolean remove(Procedure<TEnvironment> procedure) { 087 return queue.remove(new DelayedProcedure<>(procedure)); 088 } 089 090 private void execInlineChore(InlineChore chore) { 091 chore.run(); 092 add(chore); 093 } 094 095 private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) { 096 // TODO: treat this as a normal procedure, add it to the scheduler and 097 // let one of the workers handle it. 098 // Today we consider ProcedureInMemoryChore as InlineChores 099 Procedure<TEnvironment> procedure = delayed.getObject(); 100 if (procedure instanceof ProcedureInMemoryChore) { 101 executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure); 102 // if the procedure is in a waiting state again, put it back in the queue 103 procedure.updateTimestamp(); 104 if (procedure.isWaiting()) { 105 delayed.setTimeout(procedure.getTimeoutTimestamp()); 106 queue.add(delayed); 107 } 108 } else { 109 executeTimedoutProcedure(procedure); 110 } 111 } 112 113 private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) { 114 if (!chore.isWaiting()) { 115 return; 116 } 117 118 // The ProcedureInMemoryChore is a special case, and it acts as a chore. 119 // instead of bringing the Chore class in, we reuse this timeout thread for 120 // this special case. 121 try { 122 chore.periodicExecute(executor.getEnvironment()); 123 } catch (Throwable e) { 124 LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e); 125 } 126 } 127 128 protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) { 129 // The procedure received a timeout. if the procedure itself does not handle it, 130 // call abort() and add the procedure back in the queue for rollback. 131 if (proc.setTimeoutFailure(executor.getEnvironment())) { 132 long rootProcId = executor.getRootProcedureId(proc); 133 RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId); 134 procStack.abort(); 135 executor.getStore().update(proc); 136 executor.getScheduler().addFront(proc); 137 } 138 } 139}