001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Optional;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * help assign and release a worker for each remote task. For each worker, concurrent running task
033 * should be no more than maxTasks. If a task failed to acquire a worker, it will suspend and wait
034 * for workers available.
035 */
036@InterfaceAudience.Private
037public class WorkerAssigner implements ServerListener {
038  private final Map<ServerName, Integer> currentWorkers = new HashMap<>();
039  private final MasterServices master;
040  private final ProcedureEvent<?> event;
041  private final int maxTasks;
042
043  public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> event) {
044    this.maxTasks = maxTasks;
045    this.master = master;
046    this.event = event;
047    // ServerManager might be null in a test context where we are mocking; allow for this
048    ServerManager sm = this.master.getServerManager();
049    if (sm != null) {
050      sm.registerListener(this);
051    }
052  }
053
054  public synchronized Optional<ServerName> acquire() {
055    List<ServerName> serverList = master.getServerManager().getOnlineServersList();
056    Collections.shuffle(serverList);
057    Optional<ServerName> worker = serverList.stream()
058      .filter(
059        serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
060      .findAny();
061    worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
062      availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1));
063    return worker;
064  }
065
066  public synchronized void release(ServerName serverName) {
067    currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
068  }
069
070  public void suspend(Procedure<?> proc) {
071    event.suspend();
072    event.suspendIfNotReady(proc);
073  }
074
075  public void wake(MasterProcedureScheduler scheduler) {
076    if (!event.isReady()) {
077      event.wake(scheduler);
078    }
079  }
080
081  @Override
082  public void serverAdded(ServerName worker) {
083    this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
084  }
085
086  public synchronized void addUsedWorker(ServerName worker) {
087    // load used worker when master restart
088    currentWorkers.compute(worker, (serverName,
089      availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
090  }
091
092  public Integer getAvailableWorker(ServerName serverName) {
093    return currentWorkers.get(serverName);
094  }
095}