001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.Optional; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 027import org.apache.hadoop.hbase.procedure2.Procedure; 028import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * help assign and release a worker for each remote task. For each worker, concurrent running task 033 * should be no more than maxTasks. If a task failed to acquire a worker, it will suspend and wait 034 * for workers available. 035 */ 036@InterfaceAudience.Private 037public class WorkerAssigner implements ServerListener { 038 private final Map<ServerName, Integer> currentWorkers = new HashMap<>(); 039 private final MasterServices master; 040 private final ProcedureEvent<?> event; 041 private final int maxTasks; 042 043 public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> event) { 044 this.maxTasks = maxTasks; 045 this.master = master; 046 this.event = event; 047 // ServerManager might be null in a test context where we are mocking; allow for this 048 ServerManager sm = this.master.getServerManager(); 049 if (sm != null) { 050 sm.registerListener(this); 051 } 052 } 053 054 public synchronized Optional<ServerName> acquire() { 055 List<ServerName> serverList = master.getServerManager().getOnlineServersList(); 056 Collections.shuffle(serverList); 057 Optional<ServerName> worker = serverList.stream() 058 .filter( 059 serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) 060 .findAny(); 061 worker.ifPresent(name -> currentWorkers.compute(name, (serverName, 062 availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1)); 063 return worker; 064 } 065 066 public synchronized void release(ServerName serverName) { 067 currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); 068 } 069 070 public void suspend(Procedure<?> proc) { 071 event.suspend(); 072 event.suspendIfNotReady(proc); 073 } 074 075 public void wake(MasterProcedureScheduler scheduler) { 076 if (!event.isReady()) { 077 event.wake(scheduler); 078 } 079 } 080 081 @Override 082 public void serverAdded(ServerName worker) { 083 this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 084 } 085 086 public synchronized void addUsedWorker(ServerName worker) { 087 // load used worker when master restart 088 currentWorkers.compute(worker, (serverName, 089 availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); 090 } 091 092 public Integer getAvailableWorker(ServerName serverName) { 093 return currentWorkers.get(serverName); 094 } 095}