001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
022
023import com.google.errorprone.annotations.RestrictedApi;
024import java.io.IOException;
025import java.lang.management.MemoryType;
026import java.net.BindException;
027import java.net.InetAddress;
028import java.net.InetSocketAddress;
029import java.util.concurrent.atomic.AtomicBoolean;
030import javax.servlet.http.HttpServlet;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.commons.lang3.SystemUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.client.AsyncClusterConnection;
037import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
041import org.apache.hadoop.hbase.conf.ConfigurationManager;
042import org.apache.hadoop.hbase.conf.ConfigurationObserver;
043import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
044import org.apache.hadoop.hbase.executor.ExecutorService;
045import org.apache.hadoop.hbase.fs.HFileSystem;
046import org.apache.hadoop.hbase.http.InfoServer;
047import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
048import org.apache.hadoop.hbase.ipc.RpcServerInterface;
049import org.apache.hadoop.hbase.master.HMaster;
050import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
051import org.apache.hadoop.hbase.regionserver.ChunkCreator;
052import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
053import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
054import org.apache.hadoop.hbase.regionserver.ShutdownHook;
055import org.apache.hadoop.hbase.security.Superusers;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.security.UserProvider;
058import org.apache.hadoop.hbase.security.access.AccessChecker;
059import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
060import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
061import org.apache.hadoop.hbase.util.Addressing;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.FSTableDescriptors;
065import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
066import org.apache.hadoop.hbase.util.Pair;
067import org.apache.hadoop.hbase.util.Sleeper;
068import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
069import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
070import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * Base class for hbase services, such as master or region server.
077 */
078@InterfaceAudience.Private
079public abstract class HBaseServerBase<R extends HBaseRpcServicesBase<?>> extends Thread
080  implements Server, ConfigurationObserver, ConnectionRegistryEndpoint {
081
082  private static final Logger LOG = LoggerFactory.getLogger(HBaseServerBase.class);
083
084  protected final Configuration conf;
085
086  // Go down hard. Used if file system becomes unavailable and also in
087  // debugging and unit tests.
088  protected final AtomicBoolean abortRequested = new AtomicBoolean(false);
089
090  // Set when a report to the master comes back with a message asking us to
091  // shutdown. Also set by call to stop when debugging or running unit tests
092  // of HRegionServer in isolation.
093  protected volatile boolean stopped = false;
094
095  // Only for testing
096  private boolean isShutdownHookInstalled = false;
097
098  /**
099   * This servers startcode.
100   */
101  protected final long startcode;
102
103  protected final UserProvider userProvider;
104
105  // zookeeper connection and watcher
106  protected final ZKWatcher zooKeeper;
107
108  /**
109   * The server name the Master sees us as. Its made from the hostname the master passes us, port,
110   * and server startcode. Gets set after registration against Master.
111   */
112  protected ServerName serverName;
113
114  protected final R rpcServices;
115
116  /**
117   * hostname specified by hostname config
118   */
119  protected final String useThisHostnameInstead;
120
121  /**
122   * Provide online slow log responses from ringbuffer
123   */
124  protected final NamedQueueRecorder namedQueueRecorder;
125
126  /**
127   * Configuration manager is used to register/deregister and notify the configuration observers
128   * when the regionserver is notified that there was a change in the on disk configs.
129   */
130  protected final ConfigurationManager configurationManager;
131
132  /**
133   * ChoreService used to schedule tasks that we want to run periodically
134   */
135  protected final ChoreService choreService;
136
137  // Instance of the hbase executor executorService.
138  protected final ExecutorService executorService;
139
140  // Cluster Status Tracker
141  protected final ClusterStatusTracker clusterStatusTracker;
142
143  protected final CoordinatedStateManager csm;
144
145  // Info server. Default access so can be used by unit tests. REGIONSERVER
146  // is name of the webapp and the attribute name used stuffing this instance
147  // into web context.
148  protected InfoServer infoServer;
149
150  protected HFileSystem dataFs;
151
152  protected HFileSystem walFs;
153
154  protected Path dataRootDir;
155
156  protected Path walRootDir;
157
158  protected final int msgInterval;
159
160  // A sleeper that sleeps for msgInterval.
161  protected final Sleeper sleeper;
162
163  /**
164   * Go here to get table descriptors.
165   */
166  protected TableDescriptors tableDescriptors;
167
168  /**
169   * The asynchronous cluster connection to be shared by services.
170   */
171  protected AsyncClusterConnection asyncClusterConnection;
172
173  /**
174   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
175   * entries. Used for serving ClientMetaService.
176   */
177  protected final MetaRegionLocationCache metaRegionLocationCache;
178
179  protected final NettyEventLoopGroupConfig eventLoopGroupConfig;
180
181  /**
182   * If running on Windows, do windows-specific setup.
183   */
184  private static void setupWindows(final Configuration conf, ConfigurationManager cm) {
185    if (!SystemUtils.IS_OS_WINDOWS) {
186      HBasePlatformDependent.handle("HUP", (number, name) -> {
187        conf.reloadConfiguration();
188        cm.notifyAllObservers(conf);
189      });
190    }
191  }
192
193  /**
194   * Setup our cluster connection if not already initialized.
195   */
196  protected final synchronized void setupClusterConnection() throws IOException {
197    if (asyncClusterConnection == null) {
198      InetSocketAddress localAddress =
199        new InetSocketAddress(rpcServices.getSocketAddress().getAddress(), 0);
200      User user = userProvider.getCurrent();
201      asyncClusterConnection =
202        ClusterConnectionFactory.createAsyncClusterConnection(this, conf, localAddress, user);
203    }
204  }
205
206  protected final void initializeFileSystem() throws IOException {
207    // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
208    // checksum verification enabled, then automatically switch off hdfs checksum verification.
209    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
210    String walDirUri = CommonFSUtils.getDirUri(this.conf,
211      new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR))));
212    // set WAL's uri
213    if (walDirUri != null) {
214      CommonFSUtils.setFsDefault(this.conf, walDirUri);
215    }
216    // init the WALFs
217    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
218    this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
219    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
220    // underlying hadoop hdfs accessors will be going against wrong filesystem
221    // (unless all is set to defaults).
222    String rootDirUri =
223      CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR)));
224    if (rootDirUri != null) {
225      CommonFSUtils.setFsDefault(this.conf, rootDirUri);
226    }
227    // init the filesystem
228    this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
229    this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
230    this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
231      !canUpdateTableDescriptor(), cacheTableDescriptor());
232  }
233
234  public HBaseServerBase(Configuration conf, String name)
235    throws ZooKeeperConnectionException, IOException {
236    super(name); // thread name
237    this.conf = conf;
238    this.eventLoopGroupConfig =
239      NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
240    this.startcode = EnvironmentEdgeManager.currentTime();
241    this.userProvider = UserProvider.instantiate(conf);
242    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
243    this.sleeper = new Sleeper(this.msgInterval, this);
244    this.namedQueueRecorder = createNamedQueueRecord();
245    this.rpcServices = createRpcServices();
246    useThisHostnameInstead = getUseThisHostnameInstead(conf);
247    InetSocketAddress addr = rpcServices.getSocketAddress();
248    String hostName = StringUtils.isBlank(useThisHostnameInstead)
249      ? addr.getHostName()
250      : this.useThisHostnameInstead;
251    serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode);
252    // login the zookeeper client principal (if using security)
253    ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
254      HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
255    // login the server principal (if using secure Hadoop)
256    login(userProvider, hostName);
257    // init superusers and add the server principal (if using security)
258    // or process owner as default super user.
259    Superusers.initialize(conf);
260    zooKeeper =
261      new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode());
262
263    this.configurationManager = new ConfigurationManager();
264    setupWindows(conf, configurationManager);
265
266    initializeFileSystem();
267
268    this.choreService = new ChoreService(getName(), true);
269    this.executorService = new ExecutorService(getName());
270
271    this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
272
273    if (clusterMode()) {
274      if (
275        conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
276      ) {
277        csm = new ZkCoordinatedStateManager(this);
278      } else {
279        csm = null;
280      }
281      clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
282      clusterStatusTracker.start();
283    } else {
284      csm = null;
285      clusterStatusTracker = null;
286    }
287    putUpWebUI();
288  }
289
290  /**
291   * Puts up the webui.
292   */
293  private void putUpWebUI() throws IOException {
294    int port =
295      this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT);
296    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
297
298    if (this instanceof HMaster) {
299      port = conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT);
300      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
301    }
302    // -1 is for disabling info server
303    if (port < 0) {
304      return;
305    }
306
307    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
308      String msg = "Failed to start http info server. Address " + addr
309        + " does not belong to this host. Correct configuration parameter: "
310        + "hbase.regionserver.info.bindAddress";
311      LOG.error(msg);
312      throw new IOException(msg);
313    }
314    // check if auto port bind enabled
315    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false);
316    while (true) {
317      try {
318        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
319        infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
320        configureInfoServer(infoServer);
321        this.infoServer.start();
322        break;
323      } catch (BindException e) {
324        if (!auto) {
325          // auto bind disabled throw BindException
326          LOG.error("Failed binding http info server to port: " + port);
327          throw e;
328        }
329        // auto bind enabled, try to use another port
330        LOG.info("Failed binding http info server to port: " + port);
331        port++;
332        LOG.info("Retry starting http info server with port: " + port);
333      }
334    }
335    port = this.infoServer.getPort();
336    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
337    int masterInfoPort =
338      conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT);
339    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
340    conf.setInt(HConstants.MASTER_INFO_PORT, port);
341  }
342
343  /**
344   * Sets the abort state if not already set.
345   * @return True if abortRequested set to True successfully, false if an abort is already in
346   *         progress.
347   */
348  protected final boolean setAbortRequested() {
349    return abortRequested.compareAndSet(false, true);
350  }
351
352  @Override
353  public boolean isStopped() {
354    return stopped;
355  }
356
357  @Override
358  public boolean isAborted() {
359    return abortRequested.get();
360  }
361
362  @Override
363  public Configuration getConfiguration() {
364    return conf;
365  }
366
367  @Override
368  public AsyncClusterConnection getAsyncClusterConnection() {
369    return asyncClusterConnection;
370  }
371
372  @Override
373  public ZKWatcher getZooKeeper() {
374    return zooKeeper;
375  }
376
377  protected final void shutdownChore(ScheduledChore chore) {
378    if (chore != null) {
379      chore.shutdown();
380    }
381  }
382
383  protected final void initializeMemStoreChunkCreator(HeapMemoryManager hMemManager) {
384    if (MemStoreLAB.isEnabled(conf)) {
385      // MSLAB is enabled. So initialize MemStoreChunkPool
386      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
387      // it.
388      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
389      long globalMemStoreSize = pair.getFirst();
390      boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
391      // When off heap memstore in use, take full area for chunk pool.
392      float poolSizePercentage = offheap
393        ? 1.0F
394        : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
395      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
396        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
397      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
398      float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY,
399        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
400      // init the chunkCreator
401      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
402        initialCountPercentage, hMemManager, indexChunkSizePercent);
403    }
404  }
405
406  protected abstract void stopChores();
407
408  protected final void stopChoreService() {
409    // clean up the scheduled chores
410    if (choreService != null) {
411      LOG.info("Shutdown chores and chore service");
412      stopChores();
413      // cancel the remaining scheduled chores (in case we missed out any)
414      // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
415      choreService.shutdown();
416    }
417  }
418
419  protected final void stopExecutorService() {
420    if (executorService != null) {
421      LOG.info("Shutdown executor service");
422      executorService.shutdown();
423    }
424  }
425
426  protected final void closeClusterConnection() {
427    if (asyncClusterConnection != null) {
428      LOG.info("Close async cluster connection");
429      try {
430        this.asyncClusterConnection.close();
431      } catch (IOException e) {
432        // Although the {@link Closeable} interface throws an {@link
433        // IOException}, in reality, the implementation would never do that.
434        LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
435      }
436    }
437  }
438
439  protected final void stopInfoServer() {
440    if (this.infoServer != null) {
441      LOG.info("Stop info server");
442      try {
443        this.infoServer.stop();
444      } catch (Exception e) {
445        LOG.error("Failed to stop infoServer", e);
446      }
447    }
448  }
449
450  protected final void closeZooKeeper() {
451    if (this.zooKeeper != null) {
452      LOG.info("Close zookeeper");
453      this.zooKeeper.close();
454    }
455  }
456
457  /**
458   * In order to register ShutdownHook, this method is called when HMaster and HRegionServer are
459   * started. For details, please refer to HBASE-26951
460   */
461  protected final void installShutdownHook() {
462    ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
463    isShutdownHookInstalled = true;
464  }
465
466  @RestrictedApi(explanation = "Should only be called in tests", link = "",
467      allowedOnPath = ".*/src/test/.*")
468  public boolean isShutdownHookInstalled() {
469    return isShutdownHookInstalled;
470  }
471
472  @Override
473  public ServerName getServerName() {
474    return serverName;
475  }
476
477  @Override
478  public ChoreService getChoreService() {
479    return choreService;
480  }
481
482  /**
483   * @return Return table descriptors implementation.
484   */
485  public TableDescriptors getTableDescriptors() {
486    return this.tableDescriptors;
487  }
488
489  public ExecutorService getExecutorService() {
490    return executorService;
491  }
492
493  public AccessChecker getAccessChecker() {
494    return rpcServices.getAccessChecker();
495  }
496
497  public ZKPermissionWatcher getZKPermissionWatcher() {
498    return rpcServices.getZkPermissionWatcher();
499  }
500
501  @Override
502  public CoordinatedStateManager getCoordinatedStateManager() {
503    return csm;
504  }
505
506  @Override
507  public Connection createConnection(Configuration conf) throws IOException {
508    User user = UserProvider.instantiate(conf).getCurrent();
509    return ConnectionFactory.createConnection(conf, null, user);
510  }
511
512  /**
513   * @return Return the rootDir.
514   */
515  public Path getDataRootDir() {
516    return dataRootDir;
517  }
518
519  @Override
520  public FileSystem getFileSystem() {
521    return dataFs;
522  }
523
524  /**
525   * @return Return the walRootDir.
526   */
527  public Path getWALRootDir() {
528    return walRootDir;
529  }
530
531  /**
532   * @return Return the walFs.
533   */
534  public FileSystem getWALFileSystem() {
535    return walFs;
536  }
537
538  /**
539   * @return True if the cluster is up.
540   */
541  public boolean isClusterUp() {
542    return !clusterMode() || this.clusterStatusTracker.isClusterUp();
543  }
544
545  /**
546   * @return time stamp in millis of when this server was started
547   */
548  public long getStartcode() {
549    return this.startcode;
550  }
551
552  public InfoServer getInfoServer() {
553    return infoServer;
554  }
555
556  public int getMsgInterval() {
557    return msgInterval;
558  }
559
560  /**
561   * get NamedQueue Provider to add different logs to ringbuffer n
562   */
563  public NamedQueueRecorder getNamedQueueRecorder() {
564    return this.namedQueueRecorder;
565  }
566
567  public RpcServerInterface getRpcServer() {
568    return rpcServices.getRpcServer();
569  }
570
571  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
572    return eventLoopGroupConfig;
573  }
574
575  public R getRpcServices() {
576    return rpcServices;
577  }
578
579  @RestrictedApi(explanation = "Should only be called in tests", link = "",
580      allowedOnPath = ".*/src/test/.*")
581  public MetaRegionLocationCache getMetaRegionLocationCache() {
582    return this.metaRegionLocationCache;
583  }
584
585  /**
586   * Reload the configuration from disk.
587   */
588  public void updateConfiguration() {
589    LOG.info("Reloading the configuration from disk.");
590    // Reload the configuration from disk.
591    conf.reloadConfiguration();
592    configurationManager.notifyAllObservers(conf);
593  }
594
595  @Override
596  public String toString() {
597    return getServerName().toString();
598  }
599
600  protected abstract boolean canCreateBaseZNode();
601
602  protected abstract String getProcessName();
603
604  protected abstract R createRpcServices() throws IOException;
605
606  protected abstract String getUseThisHostnameInstead(Configuration conf) throws IOException;
607
608  protected abstract void login(UserProvider user, String host) throws IOException;
609
610  protected abstract NamedQueueRecorder createNamedQueueRecord();
611
612  protected abstract void configureInfoServer(InfoServer infoServer);
613
614  protected abstract Class<? extends HttpServlet> getDumpServlet();
615
616  protected abstract boolean canUpdateTableDescriptor();
617
618  protected abstract boolean cacheTableDescriptor();
619
620  protected abstract boolean clusterMode();
621}