001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master;
020
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.ClusterId;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.util.CommonFSUtils;
030import org.apache.hadoop.hbase.util.FSUtils;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037
038/**
039 * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client
040 * RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not
041 * already exist. Standby masters just read the information from the file system. This class is
042 * thread-safe.
043 *
044 * TODO: Make it a singleton without affecting concurrent junit tests.
045 */
046@InterfaceAudience.Private
047public class CachedClusterId {
048
049  public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class);
050  private static final int MAX_FETCH_TIMEOUT_MS = 10000;
051
052  private final Path rootDir;
053  private final FileSystem fs;
054
055  // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to
056  // avoid multiple fetches from FS and let only one thread fetch the information.
057  AtomicBoolean fetchInProgress = new AtomicBoolean(false);
058
059  // When true, it means that the cluster ID has been fetched successfully from fs.
060  private AtomicBoolean isClusterIdSet = new AtomicBoolean(false);
061  // Immutable once set and read multiple times.
062  private ClusterId clusterId;
063
064  private final Server server;
065
066  // cache stats for testing.
067  private AtomicInteger cacheMisses = new AtomicInteger(0);
068
069  public CachedClusterId(Server server, Configuration conf) throws IOException {
070    this.rootDir = CommonFSUtils.getRootDir(conf);
071    this.fs = rootDir.getFileSystem(conf);
072    this.server = server;
073  }
074
075  /**
076   * Succeeds only once, when setting to a non-null value. Overwrites are not allowed.
077   */
078  private void setClusterId(ClusterId id) {
079    if (id == null || isClusterIdSet.get()) {
080      return;
081    }
082    clusterId = id;
083    isClusterIdSet.set(true);
084  }
085
086  /**
087   * Returns a cached copy of the cluster ID. null if the cache is not populated.
088   */
089  private String getClusterId() {
090    if (!isClusterIdSet.get()) {
091      return null;
092    }
093    // It is ok to read without a lock since clusterId is immutable once set.
094    return clusterId.toString();
095  }
096
097  /**
098   * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress,
099   * synchronously fetches the cluster ID and sets it. If an attempt is already in progress,
100   * returns right away and the caller is expected to wait for the fetch to finish.
101   * @return true if the attempt is done, false if another thread is already fetching it.
102   */
103  private boolean attemptFetch() {
104    if (fetchInProgress.compareAndSet(false, true)) {
105      // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify
106      // the waiting threads.
107      try {
108        cacheMisses.incrementAndGet();
109        setClusterId(FSUtils.getClusterId(fs, rootDir));
110      } catch (IOException e) {
111        LOG.warn("Error fetching cluster ID", e);
112      } finally {
113        Preconditions.checkState(fetchInProgress.compareAndSet(true, false));
114        synchronized (fetchInProgress) {
115          fetchInProgress.notifyAll();
116        }
117      }
118      return true;
119    }
120    return false;
121  }
122
123  private void waitForFetchToFinish() throws InterruptedException {
124    synchronized (fetchInProgress) {
125      while (fetchInProgress.get()) {
126        // We don't want the fetches to block forever, for example if there are bugs
127        // of missing notifications.
128        fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS);
129      }
130    }
131  }
132
133  /**
134   * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached
135   * copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are
136   * trying get from a clean cache.
137   *
138   * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does
139   *     not exist on the file system or if the server initiated a tear down.
140   */
141  public String getFromCacheOrFetch() {
142    if (server.isStopping() || server.isStopped()) {
143      return null;
144    }
145    String id = getClusterId();
146    if (id != null) {
147      return id;
148    }
149    if (!attemptFetch()) {
150      // A fetch is in progress.
151      try {
152        waitForFetchToFinish();
153      } catch (InterruptedException e) {
154        // pass and return whatever is in the cache.
155      }
156    }
157    return getClusterId();
158  }
159
160  @VisibleForTesting
161  public int getCacheStats() {
162    return cacheMisses.get();
163  }
164}