001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Optional;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.procedure2.Procedure;
027import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
028import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * help assign and release a worker for each remote task. For each worker, concurrent running task
033 * should be no more than maxTasks. If a task failed to acquire a worker, it will suspend and wait
034 * for workers available.
035 */
036@InterfaceAudience.Private
037public class WorkerAssigner implements ServerListener {
038  private final Map<ServerName, Integer> currentWorkers = new HashMap<>();
039  private final MasterServices master;
040  private final ProcedureEvent<?> event;
041  private final int maxTasks;
042
043  public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> event) {
044    this.maxTasks = maxTasks;
045    this.master = master;
046    this.event = event;
047    // ServerManager might be null in a test context where we are mocking; allow for this
048    ServerManager sm = this.master.getServerManager();
049    if (sm != null) {
050      sm.registerListener(this);
051    }
052  }
053
054  public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException {
055    List<ServerName> serverList = master.getServerManager().getOnlineServersList();
056    Collections.shuffle(serverList);
057    Optional<ServerName> worker = serverList.stream()
058      .filter(
059        serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
060      .findAny();
061    if (worker.isPresent()) {
062      ServerName sn = worker.get();
063      currentWorkers.compute(sn, (serverName,
064        availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
065      return sn;
066    } else {
067      event.suspend();
068      event.suspendIfNotReady(proc);
069      throw new ProcedureSuspendedException();
070    }
071  }
072
073  public synchronized void release(ServerName serverName) {
074    currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
075    if (!event.isReady()) {
076      event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
077    }
078  }
079
080  @Override
081  public synchronized void serverAdded(ServerName worker) {
082    if (!event.isReady()) {
083      event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
084    }
085  }
086
087  public synchronized void addUsedWorker(ServerName worker) {
088    // load used worker when master restart
089    currentWorkers.compute(worker, (serverName,
090      availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
091  }
092
093  public Integer getAvailableWorker(ServerName serverName) {
094    return currentWorkers.get(serverName);
095  }
096}