001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.Optional; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.procedure2.Procedure; 027import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 028import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * help assign and release a worker for each remote task. For each worker, concurrent running task 033 * should be no more than maxTasks. If a task failed to acquire a worker, it will suspend and wait 034 * for workers available. 035 */ 036@InterfaceAudience.Private 037public class WorkerAssigner implements ServerListener { 038 private final Map<ServerName, Integer> currentWorkers = new HashMap<>(); 039 private final MasterServices master; 040 private final ProcedureEvent<?> event; 041 private final int maxTasks; 042 043 public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> event) { 044 this.maxTasks = maxTasks; 045 this.master = master; 046 this.event = event; 047 // ServerManager might be null in a test context where we are mocking; allow for this 048 ServerManager sm = this.master.getServerManager(); 049 if (sm != null) { 050 sm.registerListener(this); 051 } 052 } 053 054 public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException { 055 List<ServerName> serverList = master.getServerManager().getOnlineServersList(); 056 Collections.shuffle(serverList); 057 Optional<ServerName> worker = serverList.stream() 058 .filter( 059 serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) 060 .findAny(); 061 if (worker.isPresent()) { 062 ServerName sn = worker.get(); 063 currentWorkers.compute(sn, (serverName, 064 availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); 065 return sn; 066 } else { 067 event.suspend(); 068 event.suspendIfNotReady(proc); 069 throw new ProcedureSuspendedException(); 070 } 071 } 072 073 public synchronized void release(ServerName serverName) { 074 currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); 075 if (!event.isReady()) { 076 event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 077 } 078 } 079 080 @Override 081 public synchronized void serverAdded(ServerName worker) { 082 if (!event.isReady()) { 083 event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 084 } 085 } 086 087 public synchronized void addUsedWorker(ServerName worker) { 088 // load used worker when master restart 089 currentWorkers.compute(worker, (serverName, 090 availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); 091 } 092 093 public Integer getAvailableWorker(ServerName serverName) { 094 return currentWorkers.get(serverName); 095 } 096}