001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master;
020
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.ClusterId;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.util.CommonFSUtils;
030import org.apache.hadoop.hbase.util.FSUtils;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
036
037/**
038 * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client
039 * RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not
040 * already exist. Standby masters just read the information from the file system. This class is
041 * thread-safe.
042 *
043 * TODO: Make it a singleton without affecting concurrent junit tests.
044 */
045@InterfaceAudience.Private
046public class CachedClusterId {
047
048  public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class);
049  private static final int MAX_FETCH_TIMEOUT_MS = 10000;
050
051  private final Path rootDir;
052  private final FileSystem fs;
053
054  // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to
055  // avoid multiple fetches from FS and let only one thread fetch the information.
056  AtomicBoolean fetchInProgress = new AtomicBoolean(false);
057
058  // When true, it means that the cluster ID has been fetched successfully from fs.
059  private AtomicBoolean isClusterIdSet = new AtomicBoolean(false);
060  // Immutable once set and read multiple times.
061  private ClusterId clusterId;
062
063  private final Server server;
064
065  // cache stats for testing.
066  private AtomicInteger cacheMisses = new AtomicInteger(0);
067
068  public CachedClusterId(Server server, Configuration conf) throws IOException {
069    this.rootDir = CommonFSUtils.getRootDir(conf);
070    this.fs = rootDir.getFileSystem(conf);
071    this.server = server;
072  }
073
074  /**
075   * Succeeds only once, when setting to a non-null value. Overwrites are not allowed.
076   */
077  private void setClusterId(ClusterId id) {
078    if (id == null || isClusterIdSet.get()) {
079      return;
080    }
081    clusterId = id;
082    isClusterIdSet.set(true);
083  }
084
085  /**
086   * Returns a cached copy of the cluster ID. null if the cache is not populated.
087   */
088  private String getClusterId() {
089    if (!isClusterIdSet.get()) {
090      return null;
091    }
092    // It is ok to read without a lock since clusterId is immutable once set.
093    return clusterId.toString();
094  }
095
096  /**
097   * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress,
098   * synchronously fetches the cluster ID and sets it. If an attempt is already in progress,
099   * returns right away and the caller is expected to wait for the fetch to finish.
100   * @return true if the attempt is done, false if another thread is already fetching it.
101   */
102  private boolean attemptFetch() {
103    if (fetchInProgress.compareAndSet(false, true)) {
104      // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify
105      // the waiting threads.
106      try {
107        cacheMisses.incrementAndGet();
108        setClusterId(FSUtils.getClusterId(fs, rootDir));
109      } catch (IOException e) {
110        LOG.warn("Error fetching cluster ID", e);
111      } finally {
112        Preconditions.checkState(fetchInProgress.compareAndSet(true, false));
113        synchronized (fetchInProgress) {
114          fetchInProgress.notifyAll();
115        }
116      }
117      return true;
118    }
119    return false;
120  }
121
122  private void waitForFetchToFinish() throws InterruptedException {
123    synchronized (fetchInProgress) {
124      while (fetchInProgress.get()) {
125        // We don't want the fetches to block forever, for example if there are bugs
126        // of missing notifications.
127        fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS);
128      }
129    }
130  }
131
132  /**
133   * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached
134   * copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are
135   * trying get from a clean cache.
136   *
137   * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does
138   *     not exist on the file system or if the server initiated a tear down.
139   */
140  public String getFromCacheOrFetch() {
141    if (server.isStopping() || server.isStopped()) {
142      return null;
143    }
144    String id = getClusterId();
145    if (id != null) {
146      return id;
147    }
148    if (!attemptFetch()) {
149      // A fetch is in progress.
150      try {
151        waitForFetchToFinish();
152      } catch (InterruptedException e) {
153        // pass and return whatever is in the cache.
154      }
155    }
156    return getClusterId();
157  }
158
159  public int getCacheStats() {
160    return cacheMisses.get();
161  }
162}