001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 029 030/** 031 * Thread safe utility that keeps registry end points used by {@link ConnectionRegistry} up to date. 032 * By default the refresh happens periodically (configured via {@code intervalSecsConfigName}). The 033 * refresh can also be triggered on demand via {@link #refreshNow()}. To prevent a flood of 034 * on-demand refreshes we expect that any attempts two should be spaced at least 035 * {@code minIntervalSecsConfigName} seconds apart. 036 */ 037@InterfaceAudience.Private 038final class RegistryEndpointsRefresher { 039 040 private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class); 041 042 private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300; 043 044 private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60; 045 046 private final Thread thread; 047 private final Refresher refresher; 048 private final long initialDelayMs; 049 private final long periodicRefreshMs; 050 private final long minTimeBetweenRefreshesMs; 051 052 private boolean refreshNow = false; 053 private boolean stopped = false; 054 055 synchronized void stop() { 056 stopped = true; 057 notifyAll(); 058 } 059 060 private long getRefreshIntervalMs(boolean firstRefresh) { 061 if (refreshNow) { 062 return minTimeBetweenRefreshesMs; 063 } 064 if (firstRefresh) { 065 return initialDelayMs; 066 } 067 return periodicRefreshMs; 068 } 069 070 // The main loop for the refresh thread. 071 private void mainLoop() { 072 long lastRefreshTime = EnvironmentEdgeManager.currentTime(); 073 boolean firstRefresh = true; 074 for (;;) { 075 synchronized (this) { 076 for (;;) { 077 if (stopped) { 078 LOG.info("Registry end points refresher loop exited."); 079 return; 080 } 081 // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed, 082 // otherwise wait until periodicRefreshMs elapsed 083 long waitTime = getRefreshIntervalMs(firstRefresh) 084 - (EnvironmentEdgeManager.currentTime() - lastRefreshTime); 085 if (waitTime <= 0) { 086 // we are going to refresh, reset this flag 087 firstRefresh = false; 088 refreshNow = false; 089 break; 090 } 091 try { 092 wait(waitTime); 093 } catch (InterruptedException e) { 094 LOG.warn("Interrupted during wait", e); 095 Thread.currentThread().interrupt(); 096 continue; 097 } 098 } 099 } 100 LOG.debug("Attempting to refresh registry end points"); 101 try { 102 refresher.refresh(); 103 } catch (IOException e) { 104 LOG.warn("Error refresh registry end points", e); 105 } 106 // We do not think it is a big deal to fail one time, so no matter what is refresh result, we 107 // just update this refresh time and wait for the next round. If later this becomes critical, 108 // could change to only update this value when we have done a successful refreshing. 109 lastRefreshTime = EnvironmentEdgeManager.currentTime(); 110 LOG.debug("Finished refreshing registry end points"); 111 } 112 } 113 114 @FunctionalInterface 115 public interface Refresher { 116 117 void refresh() throws IOException; 118 } 119 120 private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs, 121 long minTimeBetweenRefreshesMs, Refresher refresher) { 122 this.initialDelayMs = initialDelayMs; 123 this.periodicRefreshMs = periodicRefreshMs; 124 this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs; 125 this.refresher = refresher; 126 thread = new Thread(this::mainLoop); 127 thread.setName("Registry-endpoints-refresh-end-points"); 128 thread.setDaemon(true); 129 thread.start(); 130 } 131 132 /** 133 * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh. 134 * See class comment for details. 135 */ 136 synchronized void refreshNow() { 137 refreshNow = true; 138 notifyAll(); 139 } 140 141 /** 142 * Create a {@link RegistryEndpointsRefresher}. If the interval secs configured via 143 * {@code intervalSecsConfigName} is less than zero, will return null here, which means disable 144 * refreshing of endpoints. 145 */ 146 static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName, 147 String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) { 148 long periodicRefreshMs = TimeUnit.SECONDS 149 .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); 150 if (periodicRefreshMs <= 0) { 151 return null; 152 } 153 long initialDelayMs = Math.max(1, 154 TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10))); 155 long minTimeBetweenRefreshesMs = TimeUnit.SECONDS 156 .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); 157 Preconditions.checkArgument(minTimeBetweenRefreshesMs <= periodicRefreshMs); 158 return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs, 159 minTimeBetweenRefreshesMs, refresher); 160 } 161}