001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.LinkedBlockingQueue; 022import org.apache.hadoop.hbase.PleaseHoldException; 023import org.apache.hadoop.hbase.client.ConnectionUtils; 024import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 025import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 026import org.apache.hadoop.hbase.util.Threads; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 035 036/** 037 * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure. 038 */ 039@InterfaceAudience.Private 040class RemoteProcedureResultReporter extends Thread { 041 042 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class); 043 044 // Time to pause if master says 'please hold'. Make configurable if needed. 045 private static final int INIT_PAUSE_TIME_MS = 1000; 046 047 private static final int MAX_BATCH = 100; 048 049 private final HRegionServer server; 050 051 private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>(); 052 053 public RemoteProcedureResultReporter(HRegionServer server) { 054 this.server = server; 055 } 056 057 public void complete(long procId, Throwable error) { 058 RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId); 059 if (error != null) { 060 LOG.debug("Failed to complete execution of pid={}", procId, error); 061 builder.setStatus(RemoteProcedureResult.Status.ERROR).setError( 062 ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error)); 063 } else { 064 LOG.debug("Successfully complete execution of pid={}", procId); 065 builder.setStatus(RemoteProcedureResult.Status.SUCCESS); 066 } 067 results.add(builder.build()); 068 } 069 070 @Override 071 public void run() { 072 ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder(); 073 int tries = 0; 074 while (!server.isStopped()) { 075 if (builder.getResultCount() == 0) { 076 try { 077 builder.addResult(results.take()); 078 } catch (InterruptedException e) { 079 Thread.currentThread().interrupt(); 080 continue; 081 } 082 } 083 while (builder.getResultCount() < MAX_BATCH) { 084 RemoteProcedureResult result = results.poll(); 085 if (result == null) { 086 break; 087 } 088 builder.addResult(result); 089 } 090 ReportProcedureDoneRequest request = builder.build(); 091 try { 092 server.reportProcedureDone(builder.build()); 093 builder.clear(); 094 tries = 0; 095 } catch (IOException e) { 096 boolean pause = 097 e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException; 098 long pauseTime; 099 if (pause) { 100 // Do backoff else we flood the Master with requests. 101 pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); 102 } else { 103 pauseTime = INIT_PAUSE_TIME_MS; // Reset. 104 } 105 LOG.info("Failed procedure report " + TextFormat.shortDebugString(request) + "; retry (#" + 106 tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)." 107 : " immediately."), 108 e); 109 Threads.sleep(pauseTime); 110 tries++; 111 } 112 } 113 } 114}