001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master; 020 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.ClusterId; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.util.CommonFSUtils; 030import org.apache.hadoop.hbase.util.FSUtils; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037 038/** 039 * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client 040 * RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not 041 * already exist. Standby masters just read the information from the file system. This class is 042 * thread-safe. 043 * 044 * TODO: Make it a singleton without affecting concurrent junit tests. 045 */ 046@InterfaceAudience.Private 047public class CachedClusterId { 048 049 public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class); 050 private static final int MAX_FETCH_TIMEOUT_MS = 10000; 051 052 private final Path rootDir; 053 private final FileSystem fs; 054 055 // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to 056 // avoid multiple fetches from FS and let only one thread fetch the information. 057 AtomicBoolean fetchInProgress = new AtomicBoolean(false); 058 059 // When true, it means that the cluster ID has been fetched successfully from fs. 060 private AtomicBoolean isClusterIdSet = new AtomicBoolean(false); 061 // Immutable once set and read multiple times. 062 private ClusterId clusterId; 063 064 private final Server server; 065 066 // cache stats for testing. 067 private AtomicInteger cacheMisses = new AtomicInteger(0); 068 069 public CachedClusterId(Server server, Configuration conf) throws IOException { 070 this.rootDir = CommonFSUtils.getRootDir(conf); 071 this.fs = rootDir.getFileSystem(conf); 072 this.server = server; 073 } 074 075 /** 076 * Succeeds only once, when setting to a non-null value. Overwrites are not allowed. 077 */ 078 private void setClusterId(ClusterId id) { 079 if (id == null || isClusterIdSet.get()) { 080 return; 081 } 082 clusterId = id; 083 isClusterIdSet.set(true); 084 } 085 086 /** 087 * Returns a cached copy of the cluster ID. null if the cache is not populated. 088 */ 089 private String getClusterId() { 090 if (!isClusterIdSet.get()) { 091 return null; 092 } 093 // It is ok to read without a lock since clusterId is immutable once set. 094 return clusterId.toString(); 095 } 096 097 /** 098 * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress, 099 * synchronously fetches the cluster ID and sets it. If an attempt is already in progress, 100 * returns right away and the caller is expected to wait for the fetch to finish. 101 * @return true if the attempt is done, false if another thread is already fetching it. 102 */ 103 private boolean attemptFetch() { 104 if (fetchInProgress.compareAndSet(false, true)) { 105 // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify 106 // the waiting threads. 107 try { 108 cacheMisses.incrementAndGet(); 109 setClusterId(FSUtils.getClusterId(fs, rootDir)); 110 } catch (IOException e) { 111 LOG.warn("Error fetching cluster ID", e); 112 } finally { 113 Preconditions.checkState(fetchInProgress.compareAndSet(true, false)); 114 synchronized (fetchInProgress) { 115 fetchInProgress.notifyAll(); 116 } 117 } 118 return true; 119 } 120 return false; 121 } 122 123 private void waitForFetchToFinish() throws InterruptedException { 124 synchronized (fetchInProgress) { 125 while (fetchInProgress.get()) { 126 // We don't want the fetches to block forever, for example if there are bugs 127 // of missing notifications. 128 fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS); 129 } 130 } 131 } 132 133 /** 134 * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached 135 * copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are 136 * trying get from a clean cache. 137 * 138 * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does 139 * not exist on the file system or if the server initiated a tear down. 140 */ 141 public String getFromCacheOrFetch() { 142 if (server.isStopping() || server.isStopped()) { 143 return null; 144 } 145 String id = getClusterId(); 146 if (id != null) { 147 return id; 148 } 149 if (!attemptFetch()) { 150 // A fetch is in progress. 151 try { 152 waitForFetchToFinish(); 153 } catch (InterruptedException e) { 154 // pass and return whatever is in the cache. 155 } 156 } 157 return getClusterId(); 158 } 159 160 @VisibleForTesting 161 public int getCacheStats() { 162 return cacheMisses.get(); 163 } 164}