001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master; 020 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.ClusterId; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.util.CommonFSUtils; 030import org.apache.hadoop.hbase.util.FSUtils; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 036 037/** 038 * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client 039 * RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not 040 * already exist. Standby masters just read the information from the file system. This class is 041 * thread-safe. 042 * 043 * TODO: Make it a singleton without affecting concurrent junit tests. 044 */ 045@InterfaceAudience.Private 046public class CachedClusterId { 047 048 public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class); 049 private static final int MAX_FETCH_TIMEOUT_MS = 10000; 050 051 private final Path rootDir; 052 private final FileSystem fs; 053 054 // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to 055 // avoid multiple fetches from FS and let only one thread fetch the information. 056 AtomicBoolean fetchInProgress = new AtomicBoolean(false); 057 058 // When true, it means that the cluster ID has been fetched successfully from fs. 059 private AtomicBoolean isClusterIdSet = new AtomicBoolean(false); 060 // Immutable once set and read multiple times. 061 private ClusterId clusterId; 062 063 private final Server server; 064 065 // cache stats for testing. 066 private AtomicInteger cacheMisses = new AtomicInteger(0); 067 068 public CachedClusterId(Server server, Configuration conf) throws IOException { 069 this.rootDir = CommonFSUtils.getRootDir(conf); 070 this.fs = rootDir.getFileSystem(conf); 071 this.server = server; 072 } 073 074 /** 075 * Succeeds only once, when setting to a non-null value. Overwrites are not allowed. 076 */ 077 private void setClusterId(ClusterId id) { 078 if (id == null || isClusterIdSet.get()) { 079 return; 080 } 081 clusterId = id; 082 isClusterIdSet.set(true); 083 } 084 085 /** 086 * Returns a cached copy of the cluster ID. null if the cache is not populated. 087 */ 088 private String getClusterId() { 089 if (!isClusterIdSet.get()) { 090 return null; 091 } 092 // It is ok to read without a lock since clusterId is immutable once set. 093 return clusterId.toString(); 094 } 095 096 /** 097 * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress, 098 * synchronously fetches the cluster ID and sets it. If an attempt is already in progress, 099 * returns right away and the caller is expected to wait for the fetch to finish. 100 * @return true if the attempt is done, false if another thread is already fetching it. 101 */ 102 private boolean attemptFetch() { 103 if (fetchInProgress.compareAndSet(false, true)) { 104 // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify 105 // the waiting threads. 106 try { 107 cacheMisses.incrementAndGet(); 108 setClusterId(FSUtils.getClusterId(fs, rootDir)); 109 } catch (IOException e) { 110 LOG.warn("Error fetching cluster ID", e); 111 } finally { 112 Preconditions.checkState(fetchInProgress.compareAndSet(true, false)); 113 synchronized (fetchInProgress) { 114 fetchInProgress.notifyAll(); 115 } 116 } 117 return true; 118 } 119 return false; 120 } 121 122 private void waitForFetchToFinish() throws InterruptedException { 123 synchronized (fetchInProgress) { 124 while (fetchInProgress.get()) { 125 // We don't want the fetches to block forever, for example if there are bugs 126 // of missing notifications. 127 fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS); 128 } 129 } 130 } 131 132 /** 133 * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached 134 * copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are 135 * trying get from a clean cache. 136 * 137 * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does 138 * not exist on the file system or if the server initiated a tear down. 139 */ 140 public String getFromCacheOrFetch() { 141 if (server.isStopping() || server.isStopped()) { 142 return null; 143 } 144 String id = getClusterId(); 145 if (id != null) { 146 return id; 147 } 148 if (!attemptFetch()) { 149 // A fetch is in progress. 150 try { 151 waitForFetchToFinish(); 152 } catch (InterruptedException e) { 153 // pass and return whatever is in the cache. 154 } 155 } 156 return getClusterId(); 157 } 158 159 public int getCacheStats() { 160 return cacheMisses.get(); 161 } 162}