001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.concurrent.ThreadPoolExecutor; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.errorhandling.ForeignException; 027import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 028import org.apache.hadoop.hbase.master.MasterServices; 029import org.apache.hadoop.hbase.master.MetricsMaster; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.security.access.AccessChecker; 032import org.apache.zookeeper.KeeperException; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 037 038public class SimpleMasterProcedureManager extends MasterProcedureManager { 039 040 public static final String SIMPLE_SIGNATURE = "simple_test"; 041 public static final String SIMPLE_DATA = "simple_test_data"; 042 043 private static final Logger LOG = LoggerFactory.getLogger(SimpleMasterProcedureManager.class); 044 045 private MasterServices master; 046 private ProcedureCoordinator coordinator; 047 048 private boolean done; 049 050 @Override 051 public void stop(String why) { 052 LOG.info("stop: " + why); 053 } 054 055 @Override 056 public boolean isStopped() { 057 return false; 058 } 059 060 @Override 061 public void initialize(MasterServices master, MetricsMaster metricsMaster) 062 throws KeeperException, IOException, UnsupportedOperationException { 063 this.master = master; 064 this.done = false; 065 066 // setup the default procedure coordinator 067 String name = master.getServerName().toString(); 068 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); 069 ProcedureCoordinatorRpcs comms = 070 new ZKProcedureCoordinator(master.getZooKeeper(), getProcedureSignature(), name); 071 072 this.coordinator = new ProcedureCoordinator(comms, tpool); 073 } 074 075 @Override 076 public String getProcedureSignature() { 077 return SIMPLE_SIGNATURE; 078 } 079 080 @Override 081 public byte[] execProcedureWithRet(ProcedureDescription desc) throws IOException { 082 this.done = false; 083 // start the process on the RS 084 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); 085 086 List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); 087 List<String> servers = new ArrayList<>(); 088 for (ServerName sn : serverNames) { 089 servers.add(sn.toString()); 090 } 091 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers); 092 if (proc == null) { 093 String msg = "Failed to submit distributed procedure for '" + getProcedureSignature() + "'"; 094 LOG.error(msg); 095 throw new IOException(msg); 096 } 097 098 HashMap<String, byte[]> returnData = null; 099 try { 100 // wait for the procedure to complete. A timer thread is kicked off that should cancel this 101 // if it takes too long. 102 returnData = proc.waitForCompletedWithRet(); 103 LOG.info("Done waiting - exec procedure for " + desc.getInstance()); 104 this.done = true; 105 } catch (InterruptedException e) { 106 ForeignException ee = 107 new ForeignException("Interrupted while waiting for procdure to finish", e); 108 monitor.receive(ee); 109 Thread.currentThread().interrupt(); 110 } catch (ForeignException e) { 111 monitor.receive(e); 112 } 113 // return the first value for testing 114 return returnData.values().iterator().next(); 115 } 116 117 @Override 118 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 119 throws IOException { 120 } 121 122 @Override 123 public boolean isProcedureDone(ProcedureDescription desc) throws IOException { 124 return done; 125 } 126 127}