001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.io.IOException; 021import java.util.concurrent.atomic.AtomicBoolean; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.ClusterId; 027import org.apache.hadoop.hbase.Server; 028import org.apache.hadoop.hbase.util.CommonFSUtils; 029import org.apache.hadoop.hbase.util.FSUtils; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 035 036/** 037 * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client RPCs 038 * that fetch the cluster ID. ClusterID is only created by an active master if one does not already 039 * exist. Standby masters just read the information from the file system. This class is thread-safe. 040 * TODO: Make it a singleton without affecting concurrent junit tests. 041 */ 042@InterfaceAudience.Private 043public class CachedClusterId { 044 045 public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class); 046 private static final int MAX_FETCH_TIMEOUT_MS = 10000; 047 048 private final Path rootDir; 049 private final FileSystem fs; 050 051 // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to 052 // avoid multiple fetches from FS and let only one thread fetch the information. 053 AtomicBoolean fetchInProgress = new AtomicBoolean(false); 054 055 // When true, it means that the cluster ID has been fetched successfully from fs. 056 private AtomicBoolean isClusterIdSet = new AtomicBoolean(false); 057 // Immutable once set and read multiple times. 058 private ClusterId clusterId; 059 060 private final Server server; 061 062 // cache stats for testing. 063 private AtomicInteger cacheMisses = new AtomicInteger(0); 064 065 public CachedClusterId(Server server, Configuration conf) throws IOException { 066 this.rootDir = CommonFSUtils.getRootDir(conf); 067 this.fs = rootDir.getFileSystem(conf); 068 this.server = server; 069 } 070 071 /** 072 * Succeeds only once, when setting to a non-null value. Overwrites are not allowed. 073 */ 074 private void setClusterId(ClusterId id) { 075 if (id == null || isClusterIdSet.get()) { 076 return; 077 } 078 clusterId = id; 079 isClusterIdSet.set(true); 080 } 081 082 /** 083 * Returns a cached copy of the cluster ID. null if the cache is not populated. 084 */ 085 private String getClusterId() { 086 if (!isClusterIdSet.get()) { 087 return null; 088 } 089 // It is ok to read without a lock since clusterId is immutable once set. 090 return clusterId.toString(); 091 } 092 093 /** 094 * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress, 095 * synchronously fetches the cluster ID and sets it. If an attempt is already in progress, returns 096 * right away and the caller is expected to wait for the fetch to finish. 097 * @return true if the attempt is done, false if another thread is already fetching it. 098 */ 099 private boolean attemptFetch() { 100 if (fetchInProgress.compareAndSet(false, true)) { 101 // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify 102 // the waiting threads. 103 try { 104 cacheMisses.incrementAndGet(); 105 setClusterId(FSUtils.getClusterId(fs, rootDir)); 106 } catch (IOException e) { 107 LOG.warn("Error fetching cluster ID", e); 108 } finally { 109 Preconditions.checkState(fetchInProgress.compareAndSet(true, false)); 110 synchronized (fetchInProgress) { 111 fetchInProgress.notifyAll(); 112 } 113 } 114 return true; 115 } 116 return false; 117 } 118 119 private void waitForFetchToFinish() throws InterruptedException { 120 synchronized (fetchInProgress) { 121 while (fetchInProgress.get()) { 122 // We don't want the fetches to block forever, for example if there are bugs 123 // of missing notifications. 124 fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS); 125 } 126 } 127 } 128 129 /** 130 * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached copy 131 * and is thread-safe. Optimized to do a single fetch when there are multiple threads are trying 132 * get from a clean cache. 133 * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does not 134 * exist on the file system or if the server initiated a tear down. 135 */ 136 public String getFromCacheOrFetch() { 137 if (server.isStopping() || server.isStopped()) { 138 return null; 139 } 140 String id = getClusterId(); 141 if (id != null) { 142 return id; 143 } 144 if (!attemptFetch()) { 145 // A fetch is in progress. 146 try { 147 waitForFetchToFinish(); 148 } catch (InterruptedException e) { 149 // pass and return whatever is in the cache. 150 } 151 } 152 return getClusterId(); 153 } 154 155 public int getCacheStats() { 156 return cacheMisses.get(); 157 } 158}