001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.concurrent.TimeUnit;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
029
030/**
031 * Thread safe utility that keeps registry end points used by {@link ConnectionRegistry} up to date.
032 * By default the refresh happens periodically (configured via {@code intervalSecsConfigName}). The
033 * refresh can also be triggered on demand via {@link #refreshNow()}. To prevent a flood of
034 * on-demand refreshes we expect that any attempts two should be spaced at least
035 * {@code minIntervalSecsConfigName} seconds apart.
036 */
037@InterfaceAudience.Private
038final class RegistryEndpointsRefresher {
039
040  private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
041
042  private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
043
044  private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
045
046  private final Thread thread;
047  private final Refresher refresher;
048  private final long initialDelayMs;
049  private final long periodicRefreshMs;
050  private final long minTimeBetweenRefreshesMs;
051
052  private boolean refreshNow = false;
053  private boolean stopped = false;
054
055  synchronized void stop() {
056    stopped = true;
057    notifyAll();
058  }
059
060  private long getRefreshIntervalMs(boolean firstRefresh) {
061    if (refreshNow) {
062      return minTimeBetweenRefreshesMs;
063    }
064    if (firstRefresh) {
065      return initialDelayMs;
066    }
067    return periodicRefreshMs;
068  }
069
070  // The main loop for the refresh thread.
071  private void mainLoop() {
072    long lastRefreshTime = EnvironmentEdgeManager.currentTime();
073    boolean firstRefresh = true;
074    for (;;) {
075      synchronized (this) {
076        for (;;) {
077          if (stopped) {
078            LOG.info("Registry end points refresher loop exited.");
079            return;
080          }
081          // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
082          // otherwise wait until periodicRefreshMs elapsed
083          long waitTime = getRefreshIntervalMs(firstRefresh)
084            - (EnvironmentEdgeManager.currentTime() - lastRefreshTime);
085          if (waitTime <= 0) {
086            // we are going to refresh, reset this flag
087            firstRefresh = false;
088            refreshNow = false;
089            break;
090          }
091          try {
092            wait(waitTime);
093          } catch (InterruptedException e) {
094            LOG.warn("Interrupted during wait", e);
095            Thread.currentThread().interrupt();
096            continue;
097          }
098        }
099      }
100      LOG.debug("Attempting to refresh registry end points");
101      try {
102        refresher.refresh();
103      } catch (IOException e) {
104        LOG.warn("Error refresh registry end points", e);
105      }
106      // We do not think it is a big deal to fail one time, so no matter what is refresh result, we
107      // just update this refresh time and wait for the next round. If later this becomes critical,
108      // could change to only update this value when we have done a successful refreshing.
109      lastRefreshTime = EnvironmentEdgeManager.currentTime();
110      LOG.debug("Finished refreshing registry end points");
111    }
112  }
113
114  @FunctionalInterface
115  public interface Refresher {
116
117    void refresh() throws IOException;
118  }
119
120  private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
121    long minTimeBetweenRefreshesMs, Refresher refresher) {
122    this.initialDelayMs = initialDelayMs;
123    this.periodicRefreshMs = periodicRefreshMs;
124    this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
125    this.refresher = refresher;
126    thread = new Thread(this::mainLoop);
127    thread.setName("Registry-endpoints-refresh-end-points");
128    thread.setDaemon(true);
129    thread.start();
130  }
131
132  /**
133   * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
134   * See class comment for details.
135   */
136  synchronized void refreshNow() {
137    refreshNow = true;
138    notifyAll();
139  }
140
141  /**
142   * Create a {@link RegistryEndpointsRefresher}. If the interval secs configured via
143   * {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
144   * refreshing of endpoints.
145   */
146  static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
147    String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
148    long periodicRefreshMs = TimeUnit.SECONDS
149      .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
150    if (periodicRefreshMs <= 0) {
151      return null;
152    }
153    long initialDelayMs = Math.max(1,
154      TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
155    long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
156      .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
157    Preconditions.checkArgument(minTimeBetweenRefreshesMs <= periodicRefreshMs);
158    return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
159      minTimeBetweenRefreshesMs, refresher);
160  }
161}