001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import org.apache.yetus.audience.InterfaceAudience; 021import org.slf4j.Logger; 022import org.slf4j.LoggerFactory; 023import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 024 025import java.util.List; 026import java.util.Map; 027 028/** 029 * A wrapper for a runnable for a group of actions for a single regionserver. 030 * <p> 031 * This can be used to build up the actions that should be taken and then 032 * </p> 033 * <p> 034 * This class exists to simulate using a ScheduledExecutorService with just a regular 035 * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could 036 * only be removed if we change the expectations in HTable around the pool the client is able to 037 * pass in and even if we deprecate the current APIs would require keeping this class around 038 * for the interim to bridge between the legacy ExecutorServices and the scheduled pool. 039 * </p> 040 */ 041@InterfaceAudience.Private 042public class DelayingRunner implements Runnable { 043 private static final Logger LOG = LoggerFactory.getLogger(DelayingRunner.class); 044 045 private final Object sleepLock = new Object(); 046 private boolean triggerWake = false; 047 private long sleepTime; 048 private MultiAction actions = new MultiAction(); 049 private Runnable runnable; 050 051 public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action>> e) { 052 this.sleepTime = sleepTime; 053 add(e); 054 } 055 056 public void setRunner(Runnable runner) { 057 this.runnable = runner; 058 } 059 060 @Override 061 public void run() { 062 if (!sleep()) { 063 LOG.warn( 064 "Interrupted while sleeping for expected sleep time " + sleepTime + " ms"); 065 } 066 //TODO maybe we should consider switching to a listenableFuture for the actual callable and 067 // then handling the results/errors as callbacks. That way we can decrement outstanding tasks 068 // even if we get interrupted here, but for now, we still need to run so we decrement the 069 // outstanding tasks 070 this.runnable.run(); 071 } 072 073 /** 074 * Sleep for an expected amount of time. 075 * <p> 076 * This is nearly a copy of what the Sleeper does, but with the ability to know if you 077 * got interrupted while sleeping. 078 * </p> 079 * 080 * @return <tt>true</tt> if the sleep completely entirely successfully, 081 * but otherwise <tt>false</tt> if the sleep was interrupted. 082 */ 083 private boolean sleep() { 084 long now = EnvironmentEdgeManager.currentTime(); 085 long startTime = now; 086 long waitTime = sleepTime; 087 while (waitTime > 0) { 088 long woke = -1; 089 try { 090 synchronized (sleepLock) { 091 if (triggerWake) break; 092 sleepLock.wait(waitTime); 093 } 094 woke = EnvironmentEdgeManager.currentTime(); 095 } catch (InterruptedException iex) { 096 return false; 097 } 098 // Recalculate waitTime. 099 woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke; 100 waitTime = waitTime - (woke - startTime); 101 } 102 return true; 103 } 104 105 public void add(Map.Entry<byte[], List<Action>> e) { 106 actions.add(e.getKey(), e.getValue()); 107 } 108 109 public MultiAction getActions() { 110 return actions; 111 } 112 113 public long getSleepTime() { 114 return sleepTime; 115 } 116}