001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey; 027import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 028import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 029import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint; 030import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest; 031import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse; 032import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 033import org.apache.hadoop.util.Shell; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 039import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 041import org.apache.hbase.thirdparty.com.google.protobuf.Service; 042 043/** 044 * Receives shell commands from the client and executes them blindly. Intended only for use by 045 * {@link ChaosMonkey} via {@link CoprocClusterManager} 046 */ 047@InterfaceAudience.Private 048public class ShellExecEndpointCoprocessor extends ShellExecEndpoint.ShellExecService 049 implements MasterCoprocessor, RegionServerCoprocessor { 050 private static final Logger LOG = LoggerFactory.getLogger(ShellExecEndpointCoprocessor.class); 051 052 public static final String BACKGROUND_DELAY_MS_KEY = "hbase.it.shellexeccoproc.async.delay.ms"; 053 public static final long DEFAULT_BACKGROUND_DELAY_MS = 1_000; 054 055 private final ExecutorService backgroundExecutor; 056 private Configuration conf; 057 058 public ShellExecEndpointCoprocessor() { 059 backgroundExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() 060 .setNameFormat(ShellExecEndpointCoprocessor.class.getSimpleName() + "-{}").setDaemon(true) 061 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Thread {} threw", t, e)).build()); 062 } 063 064 @Override 065 public Iterable<Service> getServices() { 066 return Collections.singletonList(this); 067 } 068 069 @Override 070 public void start(CoprocessorEnvironment env) { 071 conf = env.getConfiguration(); 072 } 073 074 @Override 075 public void shellExec(final RpcController controller, final ShellExecRequest request, 076 final RpcCallback<ShellExecResponse> done) { 077 final String command = request.getCommand(); 078 if (StringUtils.isBlank(command)) { 079 throw new RuntimeException("Request contained an empty command."); 080 } 081 final boolean awaitResponse = !request.hasAwaitResponse() || request.getAwaitResponse(); 082 final String[] subShellCmd = new String[] { "/usr/bin/env", "bash", "-c", command }; 083 final Shell.ShellCommandExecutor shell = new Shell.ShellCommandExecutor(subShellCmd); 084 085 final String msgFmt = 086 "Executing command" + (!awaitResponse ? " on a background thread" : "") + ": {}"; 087 LOG.info(msgFmt, command); 088 089 if (awaitResponse) { 090 runForegroundTask(shell, controller, done); 091 } else { 092 runBackgroundTask(shell, done); 093 } 094 } 095 096 private void runForegroundTask(final Shell.ShellCommandExecutor shell, 097 final RpcController controller, final RpcCallback<ShellExecResponse> done) { 098 ShellExecResponse.Builder builder = ShellExecResponse.newBuilder(); 099 try { 100 doExec(shell, builder); 101 } catch (IOException e) { 102 LOG.error("Failure launching process", e); 103 CoprocessorRpcUtils.setControllerException(controller, e); 104 } 105 done.run(builder.build()); 106 } 107 108 private void runBackgroundTask(final Shell.ShellCommandExecutor shell, 109 final RpcCallback<ShellExecResponse> done) { 110 final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS); 111 backgroundExecutor.execute(() -> { 112 try { 113 // sleep first so that the RPC can ACK. race condition here as we have no means of blocking 114 // until the IPC response has been acknowledged by the client. 115 Thread.sleep(sleepDuration); 116 doExec(shell, ShellExecResponse.newBuilder()); 117 } catch (InterruptedException e) { 118 LOG.warn("Interrupted before launching process.", e); 119 } catch (IOException e) { 120 LOG.error("Failure launching process", e); 121 } 122 }); 123 done.run(ShellExecResponse.newBuilder().build()); 124 } 125 126 /** 127 * Execute {@code shell} and collect results into {@code builder} as side-effects. 128 */ 129 private void doExec(final Shell.ShellCommandExecutor shell, 130 final ShellExecResponse.Builder builder) throws IOException { 131 try { 132 shell.execute(); 133 builder.setExitCode(shell.getExitCode()).setStdout(shell.getOutput()); 134 } catch (Shell.ExitCodeException e) { 135 LOG.warn("Launched process failed", e); 136 builder.setExitCode(e.getExitCode()).setStdout(shell.getOutput()).setStderr(e.getMessage()); 137 } 138 } 139}