View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.hbase.classification.InterfaceAudience;
23  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
24  
25  import java.util.List;
26  import java.util.Map;
27  
28  /**
29   * A wrapper for a runnable for a group of actions for a single regionserver.
30   * <p>
31   * This can be used to build up the actions that should be taken and then
32   * </p>
33   * <p>
34   * This class exists to simulate using a ScheduledExecutorService with just a regular
35   * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could
36   * only be removed if we change the expectations in HTable around the pool the client is able to
37   * pass in and even if we deprecate the current APIs would require keeping this class around
38   * for the interim to bridge between the legacy ExecutorServices and the scheduled pool.
39   * </p>
40   */
41  @InterfaceAudience.Private
42  public class DelayingRunner<T> implements Runnable {
43    private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
44  
45    private final Object sleepLock = new Object();
46    private boolean triggerWake = false;
47    private long sleepTime;
48    private MultiAction<T> actions = new MultiAction<T>();
49    private Runnable runnable;
50  
51    public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
52      this.sleepTime = sleepTime;
53      add(e);
54    }
55  
56    public void setRunner(Runnable runner) {
57      this.runnable = runner;
58    }
59  
60    @Override
61    public void run() {
62      if (!sleep()) {
63        LOG.warn(
64            "Interrupted while sleeping for expected sleep time " + sleepTime + " ms");
65      }
66      //TODO maybe we should consider switching to a listenableFuture for the actual callable and
67      // then handling the results/errors as callbacks. That way we can decrement outstanding tasks
68      // even if we get interrupted here, but for now, we still need to run so we decrement the
69      // outstanding tasks
70      this.runnable.run();
71    }
72  
73    /**
74     * Sleep for an expected amount of time.
75     * <p>
76     * This is nearly a copy of what the Sleeper does, but with the ability to know if you
77     * got interrupted while sleeping.
78     * </p>
79     *
80     * @return <tt>true</tt> if the sleep completely entirely successfully,
81     * but otherwise <tt>false</tt> if the sleep was interrupted.
82     */
83    private boolean sleep() {
84      long now = EnvironmentEdgeManager.currentTime();
85      long startTime = now;
86      long waitTime = sleepTime;
87      while (waitTime > 0) {
88        long woke = -1;
89        try {
90          synchronized (sleepLock) {
91            if (triggerWake) break;
92            sleepLock.wait(waitTime);
93          }
94          woke = EnvironmentEdgeManager.currentTime();
95        } catch (InterruptedException iex) {
96          return false;
97        }
98        // Recalculate waitTime.
99        woke = (woke == -1) ? EnvironmentEdgeManager.currentTime() : woke;
100       waitTime = waitTime - (woke - startTime);
101     }
102     return true;
103   }
104 
105   public void add(Map.Entry<byte[], List<Action<T>>> e) {
106     actions.add(e.getKey(), e.getValue());
107   }
108 
109   public MultiAction<T> getActions() {
110     return actions;
111   }
112 
113   public long getSleepTime() {
114     return sleepTime;
115   }
116 }