001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import java.io.IOException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.conf.ConfigurationObserver; 026import org.apache.hadoop.hbase.ipc.RpcServer; 027import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 028import org.apache.hadoop.hbase.master.MasterFileSystem; 029import org.apache.hadoop.hbase.master.MasterServices; 030import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 031import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 032import org.apache.hadoop.hbase.procedure2.Procedure; 033import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 034import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 035import org.apache.hadoop.hbase.security.Superusers; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.util.CancelableProgressable; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@InterfaceAudience.Private 045@InterfaceStability.Evolving 046public class MasterProcedureEnv implements ConfigurationObserver { 047 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureEnv.class); 048 049 @InterfaceAudience.Private 050 public static class WALStoreLeaseRecovery implements WALProcedureStore.LeaseRecovery { 051 private final MasterServices master; 052 053 public WALStoreLeaseRecovery(final MasterServices master) { 054 this.master = master; 055 } 056 057 @Override 058 public void recoverFileLease(final FileSystem fs, final Path path) throws IOException { 059 final Configuration conf = master.getConfiguration(); 060 final FSUtils fsUtils = FSUtils.getInstance(fs, conf); 061 fsUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() { 062 @Override 063 public boolean progress() { 064 LOG.debug("Recover Procedure Store log lease: " + path); 065 return isRunning(); 066 } 067 }); 068 } 069 070 private boolean isRunning() { 071 return !master.isStopped() && !master.isStopping() && !master.isAborted(); 072 } 073 } 074 075 private final RSProcedureDispatcher remoteDispatcher; 076 private final MasterProcedureScheduler procSched; 077 private final MasterServices master; 078 079 public MasterProcedureEnv(final MasterServices master) { 080 this(master, new RSProcedureDispatcher(master)); 081 } 082 083 public MasterProcedureEnv(final MasterServices master, 084 final RSProcedureDispatcher remoteDispatcher) { 085 this.master = master; 086 this.procSched = new MasterProcedureScheduler( 087 procId -> master.getMasterProcedureExecutor().getProcedure(procId)); 088 this.remoteDispatcher = remoteDispatcher; 089 } 090 091 public User getRequestUser() { 092 return RpcServer.getRequestUser().orElse(Superusers.getSystemUser()); 093 } 094 095 public MasterServices getMasterServices() { 096 return master; 097 } 098 099 public Configuration getMasterConfiguration() { 100 return master.getConfiguration(); 101 } 102 103 public AssignmentManager getAssignmentManager() { 104 return master.getAssignmentManager(); 105 } 106 107 public MasterCoprocessorHost getMasterCoprocessorHost() { 108 return master.getMasterCoprocessorHost(); 109 } 110 111 public MasterProcedureScheduler getProcedureScheduler() { 112 return procSched; 113 } 114 115 public RSProcedureDispatcher getRemoteDispatcher() { 116 return remoteDispatcher; 117 } 118 119 public ReplicationPeerManager getReplicationPeerManager() { 120 return master.getReplicationPeerManager(); 121 } 122 123 public MasterFileSystem getMasterFileSystem() { 124 return master.getMasterFileSystem(); 125 } 126 127 public boolean isRunning() { 128 if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; 129 return master.getMasterProcedureExecutor().isRunning(); 130 } 131 132 public boolean isInitialized() { 133 return master.isInitialized(); 134 } 135 136 public boolean waitInitialized(Procedure<?> proc) { 137 return master.getInitializedEvent().suspendIfNotReady(proc); 138 } 139 140 public void setEventReady(ProcedureEvent<?> event, boolean isReady) { 141 if (isReady) { 142 event.wake(procSched); 143 } else { 144 event.suspend(); 145 } 146 } 147 148 @Override 149 public void onConfigurationChange(Configuration conf) { 150 master.getMasterProcedureExecutor().refreshConfiguration(conf); 151 } 152}