001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.concurrent.ThreadPoolExecutor; 025 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.errorhandling.ForeignException; 028import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 029import org.apache.hadoop.hbase.master.MasterServices; 030import org.apache.hadoop.hbase.master.MetricsMaster; 031import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.security.access.AccessChecker; 034import org.apache.zookeeper.KeeperException; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038public class SimpleMasterProcedureManager extends MasterProcedureManager { 039 040 public static final String SIMPLE_SIGNATURE = "simple_test"; 041 public static final String SIMPLE_DATA = "simple_test_data"; 042 043 private static final Logger LOG = LoggerFactory.getLogger(SimpleMasterProcedureManager.class); 044 045 private MasterServices master; 046 private ProcedureCoordinator coordinator; 047 048 private boolean done; 049 050 @Override 051 public void stop(String why) { 052 LOG.info("stop: " + why); 053 } 054 055 @Override 056 public boolean isStopped() { 057 return false; 058 } 059 060 @Override 061 public void initialize(MasterServices master, MetricsMaster metricsMaster) 062 throws KeeperException, IOException, UnsupportedOperationException { 063 this.master = master; 064 this.done = false; 065 066 // setup the default procedure coordinator 067 String name = master.getServerName().toString(); 068 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); 069 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( 070 master.getZooKeeper(), getProcedureSignature(), name); 071 072 this.coordinator = new ProcedureCoordinator(comms, tpool); 073 } 074 075 @Override 076 public String getProcedureSignature() { 077 return SIMPLE_SIGNATURE; 078 } 079 080 @Override 081 public byte[] execProcedureWithRet(ProcedureDescription desc) throws IOException { 082 this.done = false; 083 // start the process on the RS 084 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); 085 086 List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); 087 List<String> servers = new ArrayList<>(); 088 for (ServerName sn : serverNames) { 089 servers.add(sn.toString()); 090 } 091 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers); 092 if (proc == null) { 093 String msg = "Failed to submit distributed procedure for '" 094 + getProcedureSignature() + "'"; 095 LOG.error(msg); 096 throw new IOException(msg); 097 } 098 099 HashMap<String, byte[]> returnData = null; 100 try { 101 // wait for the procedure to complete. A timer thread is kicked off that should cancel this 102 // if it takes too long. 103 returnData = proc.waitForCompletedWithRet(); 104 LOG.info("Done waiting - exec procedure for " + desc.getInstance()); 105 this.done = true; 106 } catch (InterruptedException e) { 107 ForeignException ee = 108 new ForeignException("Interrupted while waiting for procdure to finish", e); 109 monitor.receive(ee); 110 Thread.currentThread().interrupt(); 111 } catch (ForeignException e) { 112 monitor.receive(e); 113 } 114 // return the first value for testing 115 return returnData.values().iterator().next(); 116 } 117 118 @Override 119 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 120 throws IOException {} 121 122 @Override 123 public boolean isProcedureDone(ProcedureDescription desc) throws IOException { 124 return done; 125 } 126 127}