001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.conf.ConfigurationObserver;
026import org.apache.hadoop.hbase.ipc.RpcServer;
027import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
028import org.apache.hadoop.hbase.master.MasterFileSystem;
029import org.apache.hadoop.hbase.master.MasterServices;
030import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
031import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
032import org.apache.hadoop.hbase.procedure2.Procedure;
033import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
034import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
035import org.apache.hadoop.hbase.security.Superusers;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.util.CancelableProgressable;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@InterfaceAudience.Private
045@InterfaceStability.Evolving
046public class MasterProcedureEnv implements ConfigurationObserver {
047  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureEnv.class);
048
049  @InterfaceAudience.Private
050  public static class WALStoreLeaseRecovery implements WALProcedureStore.LeaseRecovery {
051    private final MasterServices master;
052
053    public WALStoreLeaseRecovery(final MasterServices master) {
054      this.master = master;
055    }
056
057    @Override
058    public void recoverFileLease(final FileSystem fs, final Path path) throws IOException {
059      final Configuration conf = master.getConfiguration();
060      final FSUtils fsUtils = FSUtils.getInstance(fs, conf);
061      fsUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() {
062        @Override
063        public boolean progress() {
064          LOG.debug("Recover Procedure Store log lease: " + path);
065          return isRunning();
066        }
067      });
068    }
069
070    private boolean isRunning() {
071      return !master.isStopped() && !master.isStopping() && !master.isAborted();
072    }
073  }
074
075  private final RSProcedureDispatcher remoteDispatcher;
076  private final MasterProcedureScheduler procSched;
077  private final MasterServices master;
078
079  public MasterProcedureEnv(final MasterServices master) {
080    this(master, new RSProcedureDispatcher(master));
081  }
082
083  public MasterProcedureEnv(final MasterServices master,
084      final RSProcedureDispatcher remoteDispatcher) {
085    this.master = master;
086    this.procSched = new MasterProcedureScheduler(
087      procId -> master.getMasterProcedureExecutor().getProcedure(procId));
088    this.remoteDispatcher = remoteDispatcher;
089  }
090
091  public User getRequestUser() {
092    return RpcServer.getRequestUser().orElse(Superusers.getSystemUser());
093  }
094
095  public MasterServices getMasterServices() {
096    return master;
097  }
098
099  public Configuration getMasterConfiguration() {
100    return master.getConfiguration();
101  }
102
103  public AssignmentManager getAssignmentManager() {
104    return master.getAssignmentManager();
105  }
106
107  public MasterCoprocessorHost getMasterCoprocessorHost() {
108    return master.getMasterCoprocessorHost();
109  }
110
111  public MasterProcedureScheduler getProcedureScheduler() {
112    return procSched;
113  }
114
115  public RSProcedureDispatcher getRemoteDispatcher() {
116    return remoteDispatcher;
117  }
118
119  public ReplicationPeerManager getReplicationPeerManager() {
120    return master.getReplicationPeerManager();
121  }
122
123  public MasterFileSystem getMasterFileSystem() {
124    return master.getMasterFileSystem();
125  }
126
127  public boolean isRunning() {
128    if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false;
129    return master.getMasterProcedureExecutor().isRunning();
130  }
131
132  public boolean isInitialized() {
133    return master.isInitialized();
134  }
135
136  public boolean waitInitialized(Procedure<?> proc) {
137    return master.getInitializedEvent().suspendIfNotReady(proc);
138  }
139
140  public void setEventReady(ProcedureEvent<?> event, boolean isReady) {
141    if (isReady) {
142      event.wake(procSched);
143    } else {
144      event.suspend();
145    }
146  }
147
148  @Override
149  public void onConfigurationChange(Configuration conf) {
150    master.getMasterProcedureExecutor().refreshConfiguration(conf);
151  }
152}