001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.concurrent.LinkedBlockingQueue;
022import org.apache.hadoop.hbase.PleaseHoldException;
023import org.apache.hadoop.hbase.client.ConnectionUtils;
024import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
025import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
026import org.apache.hadoop.hbase.util.Threads;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
032import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
036
037/**
038 * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure.
039 */
040@InterfaceAudience.Private
041class RemoteProcedureResultReporter extends Thread {
042
043  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class);
044
045  private static final int MAX_BATCH = 100;
046
047  private final HRegionServer server;
048
049  private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>();
050
051  public RemoteProcedureResultReporter(HRegionServer server) {
052    this.server = server;
053  }
054
055  public void complete(long procId, long initiatingMasterActiveTime, Throwable error,
056    byte[] procReturnValue) {
057    RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId)
058      .setInitiatingMasterActiveTime(initiatingMasterActiveTime);
059    if (error != null) {
060      LOG.debug("Failed to complete execution of pid={}", procId, error);
061      builder.setStatus(RemoteProcedureResult.Status.ERROR).setError(
062        ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error));
063    } else {
064      LOG.debug("Successfully complete execution of pid={}", procId);
065      builder.setStatus(RemoteProcedureResult.Status.SUCCESS);
066    }
067    if (procReturnValue != null) {
068      builder.setProcResultData(ByteString.copyFrom(procReturnValue));
069    }
070    results.add(builder.build());
071  }
072
073  @Override
074  public void run() {
075    ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder();
076    int tries = 0;
077    while (!server.isStopped()) {
078      if (builder.getResultCount() == 0) {
079        try {
080          builder.addResult(results.take());
081        } catch (InterruptedException e) {
082          Thread.currentThread().interrupt();
083          continue;
084        }
085      }
086      while (builder.getResultCount() < MAX_BATCH) {
087        RemoteProcedureResult result = results.poll();
088        if (result == null) {
089          break;
090        }
091        builder.addResult(result);
092      }
093      ReportProcedureDoneRequest request = builder.build();
094      try {
095        server.reportProcedureDone(builder.build());
096        builder.clear();
097        tries = 0;
098      } catch (IOException e) {
099        boolean pause =
100          e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException;
101        long pauseTime;
102        if (pause) {
103          // Do backoff else we flood the Master with requests.
104          pauseTime = ConnectionUtils.getPauseTime(server.getRetryPauseTime(), tries);
105        } else {
106          pauseTime = server.getRetryPauseTime(); // Reset.
107        }
108        LOG.info("Failed procedure report " + TextFormat.shortDebugString(request) + "; retry (#"
109          + tries + ")"
110          + (pause
111            ? " after " + pauseTime + "ms delay (Master is coming online...)."
112            : " immediately."),
113          e);
114        Threads.sleep(pauseTime);
115        tries++;
116      }
117    }
118  }
119}