001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ScheduledExecutorService;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseRpcServicesBase;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.client.AsyncClusterConnection;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.FutureUtils;
037import org.apache.hadoop.hbase.util.RetryCounter;
038import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
039import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
040import org.apache.hadoop.hbase.util.RetryCounterFactory;
041import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
047
048/**
049 * Manage the bootstrap node list at region server side.
050 * <p/>
051 * It will request master first to get the initial set of bootstrap nodes(a sub set of live region
052 * servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most
053 * cases, if the cluster is stable, we do not need to request master again until we reach the
054 * request master interval. And if the current number of bootstrap nodes is not enough, we will
055 * request master soon.
056 * <p/>
057 * The algorithm is very simple, as we will always fallback to request master. THe trick here is
058 * that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it
059 * will not put too much pressure on master if we always request master. And for large clusters, we
060 * will soon get enough bootstrap nodes and stop requesting master.
061 */
062@InterfaceAudience.Private
063public class BootstrapNodeManager {
064
065  private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class);
066
067  public static final String REQUEST_MASTER_INTERVAL_SECS =
068    "hbase.server.bootstrap.request_master_interval.secs";
069
070  // default request every 10 minutes
071  public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10);
072
073  public static final String REQUEST_MASTER_MIN_INTERVAL_SECS =
074    "hbase.server.bootstrap.request_master_min_interval.secs";
075
076  // default 30 seconds
077  public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30;
078
079  public static final String REQUEST_REGIONSERVER_INTERVAL_SECS =
080    "hbase.server.bootstrap.request_regionserver_interval.secs";
081
082  // default request every 30 seconds
083  public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30;
084
085  private static final float JITTER = 0.2f;
086
087  private volatile List<ServerName> nodes = Collections.emptyList();
088
089  private final AsyncClusterConnection conn;
090
091  private final MasterAddressTracker masterAddrTracker;
092
093  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
094    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build());
095
096  private final long requestMasterIntervalSecs;
097
098  private final long requestMasterMinIntervalSecs;
099
100  private final long requestRegionServerIntervalSecs;
101
102  private final int maxNodeCount;
103
104  private final RetryCounterFactory retryCounterFactory;
105
106  private RetryCounter retryCounter;
107
108  private long lastRequestMasterTime;
109
110  public BootstrapNodeManager(AsyncClusterConnection conn, MasterAddressTracker masterAddrTracker) {
111    this.conn = conn;
112    this.masterAddrTracker = masterAddrTracker;
113    Configuration conf = conn.getConfiguration();
114    requestMasterIntervalSecs =
115      conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS);
116    requestMasterMinIntervalSecs =
117      conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS);
118    requestRegionServerIntervalSecs =
119      conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS);
120    maxNodeCount = conf.getInt(HBaseRpcServicesBase.CLIENT_BOOTSTRAP_NODE_LIMIT,
121      HBaseRpcServicesBase.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
122    retryCounterFactory = new RetryCounterFactory(
123      new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER)
124        .setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs)
125        .setTimeUnit(TimeUnit.SECONDS));
126    executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
127      TimeUnit.SECONDS);
128  }
129
130  private long getDelay(long delay) {
131    long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER);
132    return delay + jitterDelay;
133  }
134
135  private void getFromMaster() {
136    List<ServerName> liveRegionServers;
137    try {
138      // get 2 times number of node
139      liveRegionServers =
140        FutureUtils.get(conn.getLiveRegionServers(masterAddrTracker, maxNodeCount * 2));
141    } catch (IOException e) {
142      LOG.warn("failed to get live region servers from master", e);
143      if (retryCounter == null) {
144        retryCounter = retryCounterFactory.create();
145      }
146      executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(),
147        TimeUnit.SECONDS);
148      return;
149    }
150    retryCounter = null;
151    lastRequestMasterTime = EnvironmentEdgeManager.currentTime();
152    this.nodes = Collections.unmodifiableList(liveRegionServers);
153    if (liveRegionServers.size() < maxNodeCount) {
154      // If the number of live region servers is small, it means the cluster is small, so requesting
155      // master with a higher frequency will not be a big problem, so here we will always request
156      // master to get the live region servers as bootstrap nodes.
157      executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
158        TimeUnit.SECONDS);
159      return;
160    }
161    // schedule tasks to exchange the bootstrap nodes with other region servers.
162    executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
163      TimeUnit.SECONDS);
164  }
165
166  // this method is also used to test whether a given region server is still alive.
167  private void getFromRegionServer() {
168    if (
169      EnvironmentEdgeManager.currentTime() - lastRequestMasterTime
170          >= TimeUnit.SECONDS.toMillis(requestMasterIntervalSecs)
171    ) {
172      // schedule a get from master task immediately if haven't request master for more than
173      // requestMasterIntervalSecs
174      executor.execute(this::getFromMaster);
175      return;
176    }
177    List<ServerName> currentList = this.nodes;
178    ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size()));
179    List<ServerName> otherList;
180    try {
181      otherList = FutureUtils.get(conn.getAllBootstrapNodes(peer));
182    } catch (IOException e) {
183      LOG.warn("failed to request region server {}", peer, e);
184      // remove this region server from the list since it can not respond successfully
185      List<ServerName> newList = currentList.stream().filter(sn -> sn != peer)
186        .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
187      this.nodes = newList;
188      if (newList.size() < maxNodeCount) {
189        // schedule a get from master task immediately
190        executor.execute(this::getFromMaster);
191      } else {
192        executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
193          TimeUnit.SECONDS);
194      }
195      return;
196    }
197    // randomly select new live region server list
198    Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList);
199    newRegionServers.addAll(otherList);
200    List<ServerName> newList = new ArrayList<ServerName>(newRegionServers);
201    Collections.shuffle(newList, ThreadLocalRandom.current());
202    int expectedListSize = maxNodeCount * 2;
203    if (newList.size() <= expectedListSize) {
204      this.nodes = Collections.unmodifiableList(newList);
205    } else {
206      this.nodes =
207        Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize)));
208    }
209    // schedule a new get from region server task
210    executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS);
211  }
212
213  public void stop() {
214    executor.shutdownNow();
215  }
216
217  public List<ServerName> getBootstrapNodes() {
218    return nodes;
219  }
220}