001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.RejectedExecutionException;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicLong;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.RegionLocations;
043import org.apache.hadoop.hbase.RetryImmediatelyException;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
047import org.apache.hadoop.hbase.client.coprocessor.Batch;
048import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.htrace.core.Tracer;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
058
059/**
060 * The context, and return value, for a single submit/submitAll call.
061 * Note on how this class (one AP submit) works. Initially, all requests are split into groups
062 * by server; request is sent to each server in parallel; the RPC calls are not async so a
063 * thread per server is used. Every time some actions fail, regions/locations might have
064 * changed, so we re-group them by server and region again and send these groups in parallel
065 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
066 * scheduling children. This is why lots of code doesn't require any synchronization.
067 */
068@InterfaceAudience.Private
069class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
070
071  private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class);
072
073  private RetryingTimeTracker tracker;
074
075  /**
076   * Runnable (that can be submitted to thread pool) that waits for when it's time
077   * to issue replica calls, finds region replicas, groups the requests by replica and
078   * issues the calls (on separate threads, via sendMultiAction).
079   * This is done on a separate thread because we don't want to wait on user thread for
080   * our asynchronous call, and usually we have to wait before making replica calls.
081   */
082  private final class ReplicaCallIssuingRunnable implements Runnable {
083    private final long startTime;
084    private final List<Action> initialActions;
085
086    public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
087      this.initialActions = initialActions;
088      this.startTime = startTime;
089    }
090
091    @Override
092    public void run() {
093      boolean done = false;
094      if (asyncProcess.primaryCallTimeoutMicroseconds > 0) {
095        try {
096          done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds);
097        } catch (InterruptedException ex) {
098          LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage());
099          return;
100        }
101      }
102      if (done) return; // Done within primary timeout
103      Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
104      List<Action> unknownLocActions = new ArrayList<>();
105      if (replicaGetIndices == null) {
106        for (int i = 0; i < results.length; ++i) {
107          addReplicaActions(i, actionsByServer, unknownLocActions);
108        }
109      } else {
110        for (int replicaGetIndice : replicaGetIndices) {
111          addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
112        }
113      }
114      if (!actionsByServer.isEmpty()) {
115        sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
116      }
117      if (!unknownLocActions.isEmpty()) {
118        actionsByServer = new HashMap<>();
119        for (Action action : unknownLocActions) {
120          addReplicaActionsAgain(action, actionsByServer);
121        }
122        // Some actions may have completely failed, they are handled inside addAgain.
123        if (!actionsByServer.isEmpty()) {
124          sendMultiAction(actionsByServer, 1, null, true);
125        }
126      }
127    }
128
129    /**
130     * Add replica actions to action map by server.
131     * @param index Index of the original action.
132     * @param actionsByServer The map by server to add it to.
133     */
134    private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
135                                   List<Action> unknownReplicaActions) {
136      if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
137      Action action = initialActions.get(index);
138      RegionLocations loc = findAllLocationsOrFail(action, true);
139      if (loc == null) return;
140      HRegionLocation[] locs = loc.getRegionLocations();
141      if (locs.length == 1) {
142        LOG.warn("No replicas found for {}", action.getAction());
143        return;
144      }
145      synchronized (replicaResultLock) {
146        // Don't run replica calls if the original has finished. We could do it e.g. if
147        // original has already failed before first replica call (unlikely given retries),
148        // but that would require additional synchronization w.r.t. returning to caller.
149        if (results[index] != null) return;
150        // We set the number of calls here. After that any path must call setResult/setError.
151        // True even for replicas that are not found - if we refuse to send we MUST set error.
152        updateResult(index, new ReplicaResultState(locs.length));
153      }
154      for (int i = 1; i < locs.length; ++i) {
155        Action replicaAction = new Action(action, i);
156        if (locs[i] != null) {
157          asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
158              replicaAction, actionsByServer, nonceGroup);
159        } else {
160          unknownReplicaActions.add(replicaAction);
161        }
162      }
163    }
164
165    private void addReplicaActionsAgain(
166        Action action, Map<ServerName, MultiAction> actionsByServer) {
167      if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
168        throw new AssertionError("Cannot have default replica here");
169      }
170      HRegionLocation loc = getReplicaLocationOrFail(action);
171      if (loc == null) return;
172      asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
173          action, actionsByServer, nonceGroup);
174    }
175  }
176
177  /**
178   * Runnable (that can be submitted to thread pool) that submits MultiAction to a
179   * single server. The server call is synchronous, therefore we do it on a thread pool.
180   */
181  @VisibleForTesting
182  final class SingleServerRequestRunnable implements Runnable {
183    private final MultiAction multiAction;
184    private final int numAttempt;
185    private final ServerName server;
186    private final Set<CancellableRegionServerCallable> callsInProgress;
187    @VisibleForTesting
188    SingleServerRequestRunnable(
189        MultiAction multiAction, int numAttempt, ServerName server,
190        Set<CancellableRegionServerCallable> callsInProgress) {
191      this.multiAction = multiAction;
192      this.numAttempt = numAttempt;
193      this.server = server;
194      this.callsInProgress = callsInProgress;
195    }
196
197    @Override
198    public void run() {
199      AbstractResponse res = null;
200      CancellableRegionServerCallable callable = currentCallable;
201      try {
202        // setup the callable based on the actions, if we don't have one already from the request
203        if (callable == null) {
204          callable = createCallable(server, tableName, multiAction);
205        }
206        RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
207        try {
208          if (callsInProgress != null) {
209            callsInProgress.add(callable);
210          }
211          res = caller.callWithoutRetries(callable, operationTimeout);
212          if (res == null) {
213            // Cancelled
214            return;
215          }
216        } catch (IOException e) {
217          // The service itself failed . It may be an error coming from the communication
218          //   layer, but, as well, a functional error raised by the server.
219          receiveGlobalFailure(multiAction, server, numAttempt, e);
220          return;
221        } catch (Throwable t) {
222          // This should not happen. Let's log & retry anyway.
223          LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." +
224              " Retrying. Server=" + server + ", tableName=" + tableName, t);
225          receiveGlobalFailure(multiAction, server, numAttempt, t);
226          return;
227        }
228        if (res.type() == AbstractResponse.ResponseType.MULTI) {
229          // Normal case: we received an answer from the server, and it's not an exception.
230          receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
231        } else {
232          if (results != null) {
233            SingleResponse singleResponse = (SingleResponse) res;
234            updateResult(0, singleResponse.getEntry());
235          }
236          decActionCounter(1);
237        }
238      } catch (Throwable t) {
239        // Something really bad happened. We are on the send thread that will now die.
240        LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t);
241        throw new RuntimeException(t);
242      } finally {
243        asyncProcess.decTaskCounters(multiAction.getRegions(), server);
244        if (callsInProgress != null && callable != null && res != null) {
245          callsInProgress.remove(callable);
246        }
247      }
248    }
249  }
250
251  private final Batch.Callback<CResult> callback;
252  private final BatchErrors errors;
253  private final ConnectionImplementation.ServerErrorTracker errorsByServer;
254  private final ExecutorService pool;
255  private final Set<CancellableRegionServerCallable> callsInProgress;
256
257
258  private final TableName tableName;
259  private final AtomicLong actionsInProgress = new AtomicLong(-1);
260  /**
261   * The lock controls access to results. It is only held when populating results where
262   * there might be several callers (eventual consistency gets). For other requests,
263   * there's one unique call going on per result index.
264   */
265  private final Object replicaResultLock = new Object();
266  /**
267   * Result array.  Null if results are not needed. Otherwise, each index corresponds to
268   * the action index in initial actions submitted. For most request types, has null-s for
269   * requests that are not done, and result/exception for those that are done.
270   * For eventual-consistency gets, initially the same applies; at some point, replica calls
271   * might be started, and ReplicaResultState is put at the corresponding indices. The
272   * returning calls check the type to detect when this is the case. After all calls are done,
273   * ReplicaResultState-s are replaced with results for the user.
274   */
275  private final Object[] results;
276  /**
277   * Indices of replica gets in results. If null, all or no actions are replica-gets.
278   */
279  private final int[] replicaGetIndices;
280  private final boolean hasAnyReplicaGets;
281  private final long nonceGroup;
282  private final CancellableRegionServerCallable currentCallable;
283  private final int operationTimeout;
284  private final int rpcTimeout;
285  private final AsyncProcess asyncProcess;
286
287  /**
288   * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
289   * used to make logging more clear, we don't actually care why we don't retry.
290   */
291  public enum Retry {
292    YES,
293    NO_LOCATION_PROBLEM,
294    NO_NOT_RETRIABLE,
295    NO_RETRIES_EXHAUSTED,
296    NO_OTHER_SUCCEEDED
297  }
298
299  /** Sync point for calls to multiple replicas for the same user request (Get).
300   * Created and put in the results array (we assume replica calls require results) when
301   * the replica calls are launched. See results for details of this process.
302   * POJO, all fields are public. To modify them, the object itself is locked. */
303  private static class ReplicaResultState {
304    public ReplicaResultState(int callCount) {
305      this.callCount = callCount;
306    }
307
308    /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
309    int callCount;
310    /** Errors for which it is not decided whether we will report them to user. If one of the
311     * calls succeeds, we will discard the errors that may have happened in the other calls. */
312    BatchErrors replicaErrors = null;
313
314    @Override
315    public String toString() {
316      return "[call count " + callCount + "; errors " + replicaErrors + "]";
317    }
318  }
319
320  public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
321      long nonceGroup, AsyncProcess asyncProcess) {
322    this.pool = task.getPool();
323    this.callback = task.getCallback();
324    this.nonceGroup = nonceGroup;
325    this.tableName = task.getTableName();
326    this.actionsInProgress.set(actions.size());
327    if (task.getResults() == null) {
328      results = task.getNeedResults() ? new Object[actions.size()] : null;
329    } else {
330      if (task.getResults().length != actions.size()) {
331        throw new AssertionError("results.length");
332      }
333      this.results = task.getResults();
334      for (int i = 0; i != this.results.length; ++i) {
335        results[i] = null;
336      }
337    }
338    List<Integer> replicaGetIndices = null;
339    boolean hasAnyReplicaGets = false;
340    if (results != null) {
341      // Check to see if any requests might require replica calls.
342      // We expect that many requests will consist of all or no multi-replica gets; in such
343      // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
344      // store the list of action indexes for which replica gets are possible, and set
345      // hasAnyReplicaGets to true.
346      boolean hasAnyNonReplicaReqs = false;
347      int posInList = 0;
348      for (Action action : actions) {
349        boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
350        if (isReplicaGet) {
351          hasAnyReplicaGets = true;
352          if (hasAnyNonReplicaReqs) { // Mixed case
353            if (replicaGetIndices == null) {
354              replicaGetIndices = new ArrayList<>(actions.size() - 1);
355            }
356            replicaGetIndices.add(posInList);
357          }
358        } else if (!hasAnyNonReplicaReqs) {
359          // The first non-multi-replica request in the action list.
360          hasAnyNonReplicaReqs = true;
361          if (posInList > 0) {
362            // Add all the previous requests to the index lists. We know they are all
363            // replica-gets because this is the first non-multi-replica request in the list.
364            replicaGetIndices = new ArrayList<>(actions.size() - 1);
365            for (int i = 0; i < posInList; ++i) {
366              replicaGetIndices.add(i);
367            }
368          }
369        }
370        ++posInList;
371      }
372    }
373    this.hasAnyReplicaGets = hasAnyReplicaGets;
374    if (replicaGetIndices != null) {
375      this.replicaGetIndices = new int[replicaGetIndices.size()];
376      int i = 0;
377      for (Integer el : replicaGetIndices) {
378        this.replicaGetIndices[i++] = el;
379      }
380    } else {
381      this.replicaGetIndices = null;
382    }
383    this.callsInProgress = !hasAnyReplicaGets ? null :
384        Collections.newSetFromMap(
385            new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
386    this.asyncProcess = asyncProcess;
387    this.errorsByServer = createServerErrorTracker();
388    this.errors = new BatchErrors();
389    this.operationTimeout = task.getOperationTimeout();
390    this.rpcTimeout = task.getRpcTimeout();
391    this.currentCallable = task.getCallable();
392    if (task.getCallable() == null) {
393      tracker = new RetryingTimeTracker().start();
394    }
395  }
396
397  @VisibleForTesting
398  protected Set<CancellableRegionServerCallable> getCallsInProgress() {
399    return callsInProgress;
400  }
401
402  @VisibleForTesting
403  SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server,
404        Set<CancellableRegionServerCallable> callsInProgress) {
405    return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
406  }
407
408  /**
409   * Group a list of actions per region servers, and send them.
410   *
411   * @param currentActions - the list of row to submit
412   * @param numAttempt - the current numAttempt (first attempt is 1)
413   */
414  void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
415    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
416
417    boolean isReplica = false;
418    List<Action> unknownReplicaActions = null;
419    for (Action action : currentActions) {
420      RegionLocations locs = findAllLocationsOrFail(action, true);
421      if (locs == null) continue;
422      boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
423      if (isReplica && !isReplicaAction) {
424        // This is the property of the current implementation, not a requirement.
425        throw new AssertionError("Replica and non-replica actions in the same retry");
426      }
427      isReplica = isReplicaAction;
428      HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
429      if (loc == null || loc.getServerName() == null) {
430        if (isReplica) {
431          if (unknownReplicaActions == null) {
432            unknownReplicaActions = new ArrayList<>(1);
433          }
434          unknownReplicaActions.add(action);
435        } else {
436          // TODO: relies on primary location always being fetched
437          manageLocationError(action, null);
438        }
439      } else {
440        byte[] regionName = loc.getRegionInfo().getRegionName();
441        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
442      }
443    }
444    boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
445    boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
446
447    if (!actionsByServer.isEmpty()) {
448      // If this is a first attempt to group and send, no replicas, we need replica thread.
449      sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
450          ? currentActions : null, numAttempt > 1 && !hasUnknown);
451    }
452
453    if (hasUnknown) {
454      actionsByServer = new HashMap<>();
455      for (Action action : unknownReplicaActions) {
456        HRegionLocation loc = getReplicaLocationOrFail(action);
457        if (loc == null) continue;
458        byte[] regionName = loc.getRegionInfo().getRegionName();
459        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
460      }
461      if (!actionsByServer.isEmpty()) {
462        sendMultiAction(
463            actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
464      }
465    }
466  }
467
468  private HRegionLocation getReplicaLocationOrFail(Action action) {
469    // We are going to try get location once again. For each action, we'll do it once
470    // from cache, because the previous calls in the loop might populate it.
471    int replicaId = action.getReplicaId();
472    RegionLocations locs = findAllLocationsOrFail(action, true);
473    if (locs == null) return null; // manageError already called
474    HRegionLocation loc = locs.getRegionLocation(replicaId);
475    if (loc == null || loc.getServerName() == null) {
476      locs = findAllLocationsOrFail(action, false);
477      if (locs == null) return null; // manageError already called
478      loc = locs.getRegionLocation(replicaId);
479    }
480    if (loc == null || loc.getServerName() == null) {
481      manageLocationError(action, null);
482      return null;
483    }
484    return loc;
485  }
486
487  private void manageLocationError(Action action, Exception ex) {
488    String msg = "Cannot get replica " + action.getReplicaId()
489        + " location for " + action.getAction();
490    LOG.error(msg);
491    if (ex == null) {
492      ex = new IOException(msg);
493    }
494    manageError(action.getOriginalIndex(), action.getAction(),
495        Retry.NO_LOCATION_PROBLEM, ex, null);
496  }
497
498  private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
499    if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id +
500        ", row cannot be null");
501    RegionLocations loc = null;
502    try {
503      loc = asyncProcess.connection.locateRegion(
504          tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
505    } catch (IOException ex) {
506      manageLocationError(action, ex);
507    }
508    return loc;
509  }
510
511  /**
512   * Send a multi action structure to the servers, after a delay depending on the attempt
513   * number. Asynchronous.
514   *
515   * @param actionsByServer the actions structured by regions
516   * @param numAttempt the attempt number.
517   * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
518   */
519  void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
520                               int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
521    // Run the last item on the same thread if we are already on a send thread.
522    // We hope most of the time it will be the only item, so we can cut down on threads.
523    int actionsRemaining = actionsByServer.size();
524    // This iteration is by server (the HRegionLocation comparator is by server portion only).
525    for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
526      ServerName server = e.getKey();
527      MultiAction multiAction = e.getValue();
528      Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
529          numAttempt);
530      // make sure we correctly count the number of runnables before we try to reuse the send
531      // thread, in case we had to split the request into different runnables because of backoff
532      if (runnables.size() > actionsRemaining) {
533        actionsRemaining = runnables.size();
534      }
535
536      // run all the runnables
537      // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow
538      // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
539      for (Runnable runnable : runnables) {
540        if ((--actionsRemaining == 0) && reuseThread
541            && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
542          runnable.run();
543        } else {
544          try {
545            pool.submit(runnable);
546          } catch (Throwable t) {
547            if (t instanceof RejectedExecutionException) {
548              // This should never happen. But as the pool is provided by the end user,
549              // let's secure this a little.
550              LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." +
551                  " Server=" + server.getServerName(), t);
552            } else {
553              // see #HBASE-14359 for more details
554              LOG.warn("Caught unexpected exception/error: ", t);
555            }
556            asyncProcess.decTaskCounters(multiAction.getRegions(), server);
557            // We're likely to fail again, but this will increment the attempt counter,
558            // so it will finish.
559            receiveGlobalFailure(multiAction, server, numAttempt, t);
560          }
561        }
562      }
563    }
564
565    if (actionsForReplicaThread != null) {
566      startWaitingForReplicaCalls(actionsForReplicaThread);
567    }
568  }
569
570  private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
571                                                                   MultiAction multiAction,
572                                                                   int numAttempt) {
573    // no stats to manage, just do the standard action
574    if (asyncProcess.connection.getStatisticsTracker() == null) {
575      if (asyncProcess.connection.getConnectionMetrics() != null) {
576        asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
577      }
578      asyncProcess.incTaskCounters(multiAction.getRegions(), server);
579      SingleServerRequestRunnable runnable = createSingleServerRequest(
580              multiAction, numAttempt, server, callsInProgress);
581      Tracer tracer = Tracer.curThreadTracer();
582
583      if (tracer == null) {
584        return Collections.singletonList(runnable);
585      } else {
586        return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction"));
587      }
588    }
589
590    // group the actions by the amount of delay
591    Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
592
593    // split up the actions
594    for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
595      Long backoff = getBackoff(server, e.getKey());
596      DelayingRunner runner = actions.get(backoff);
597      if (runner == null) {
598        actions.put(backoff, new DelayingRunner(backoff, e));
599      } else {
600        runner.add(e);
601      }
602    }
603
604    List<Runnable> toReturn = new ArrayList<>(actions.size());
605    for (DelayingRunner runner : actions.values()) {
606      asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
607      String traceText = "AsyncProcess.sendMultiAction";
608      Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
609      // use a delay runner only if we need to sleep for some time
610      if (runner.getSleepTime() > 0) {
611        runner.setRunner(runnable);
612        traceText = "AsyncProcess.clientBackoff.sendMultiAction";
613        runnable = runner;
614        if (asyncProcess.connection.getConnectionMetrics() != null) {
615          asyncProcess.connection.getConnectionMetrics()
616            .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
617        }
618      } else {
619        if (asyncProcess.connection.getConnectionMetrics() != null) {
620          asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
621        }
622      }
623      runnable = TraceUtil.wrap(runnable, traceText);
624      toReturn.add(runnable);
625
626    }
627    return toReturn;
628  }
629
630  /**
631   * @param server server location where the target region is hosted
632   * @param regionName name of the region which we are going to write some data
633   * @return the amount of time the client should wait until it submit a request to the
634   * specified server and region
635   */
636  private Long getBackoff(ServerName server, byte[] regionName) {
637    ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker();
638    ServerStatistics stats = tracker.getStats(server);
639    return asyncProcess.connection.getBackoffPolicy()
640        .getBackoffTime(server, regionName, stats);
641  }
642
643  /**
644   * Starts waiting to issue replica calls on a different thread; or issues them immediately.
645   */
646  private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
647    long startTime = EnvironmentEdgeManager.currentTime();
648    ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
649        actionsForReplicaThread, startTime);
650    if (asyncProcess.primaryCallTimeoutMicroseconds == 0) {
651      // Start replica calls immediately.
652      replicaRunnable.run();
653    } else {
654      // Start the thread that may kick off replica gets.
655      // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
656      try {
657        pool.submit(replicaRunnable);
658      } catch (RejectedExecutionException ree) {
659        LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree);
660      }
661    }
662  }
663
664  /**
665   * Check that we can retry acts accordingly: logs, set the error status.
666   *
667   * @param originalIndex the position in the list sent
668   * @param row           the row
669   * @param canRetry      if false, we won't retry whatever the settings.
670   * @param throwable     the throwable, if any (can be null)
671   * @param server        the location, if any (can be null)
672   * @return true if the action can be retried, false otherwise.
673   */
674  Retry manageError(int originalIndex, Row row, Retry canRetry,
675                                        Throwable throwable, ServerName server) {
676    if (canRetry == Retry.YES
677        && throwable != null && throwable instanceof DoNotRetryIOException) {
678      canRetry = Retry.NO_NOT_RETRIABLE;
679    }
680
681    if (canRetry != Retry.YES) {
682      // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
683      setError(originalIndex, row, throwable, server);
684    } else if (isActionComplete(originalIndex, row)) {
685      canRetry = Retry.NO_OTHER_SUCCEEDED;
686    }
687    return canRetry;
688  }
689
690  /**
691   * Resubmit all the actions from this multiaction after a failure.
692   *
693   * @param rsActions  the actions still to do from the initial list
694   * @param server   the destination
695   * @param numAttempt the number of attempts so far
696   * @param t the throwable (if any) that caused the resubmit
697   */
698  private void receiveGlobalFailure(
699      MultiAction rsActions, ServerName server, int numAttempt, Throwable t) {
700    errorsByServer.reportServerError(server);
701    Retry canRetry = errorsByServer.canTryMore(numAttempt)
702        ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
703
704    cleanServerCache(server, t);
705    int failed = 0;
706    int stopped = 0;
707    List<Action> toReplay = new ArrayList<>();
708    for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
709      byte[] regionName = e.getKey();
710      byte[] row = e.getValue().get(0).getAction().getRow();
711      // Do not use the exception for updating cache because it might be coming from
712      // any of the regions in the MultiAction.
713      updateCachedLocations(server, regionName, row,
714        ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
715      for (Action action : e.getValue()) {
716        Retry retry = manageError(
717            action.getOriginalIndex(), action.getAction(), canRetry, t, server);
718        if (retry == Retry.YES) {
719          toReplay.add(action);
720        } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
721          ++stopped;
722        } else {
723          ++failed;
724        }
725      }
726    }
727
728    if (toReplay.isEmpty()) {
729      logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
730    } else {
731      resubmit(server, toReplay, numAttempt, rsActions.size(), t);
732    }
733  }
734
735  /**
736   * Log as much info as possible, and, if there is something to replay,
737   * submit it again after a back off sleep.
738   */
739  private void resubmit(ServerName oldServer, List<Action> toReplay,
740                        int numAttempt, int failureCount, Throwable throwable) {
741    // We have something to replay. We're going to sleep a little before.
742
743    // We have two contradicting needs here:
744    //  1) We want to get the new location after having slept, as it may change.
745    //  2) We want to take into account the location when calculating the sleep time.
746    //  3) If all this is just because the response needed to be chunked try again FAST.
747    // It should be possible to have some heuristics to take the right decision. Short term,
748    //  we go for one.
749    boolean retryImmediately = throwable instanceof RetryImmediatelyException;
750    int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
751    long backOffTime;
752    if (retryImmediately) {
753      backOffTime = 0;
754    } else if (throwable instanceof CallQueueTooBigException) {
755      // Give a special check on CQTBE, see #HBASE-17114
756      backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE);
757    } else {
758      backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause);
759    }
760    if (numAttempt > asyncProcess.startLogErrorsCnt) {
761      // We use this value to have some logs when we have multiple failures, but not too many
762      //  logs, as errors are to be expected when a region moves, splits and so on
763      LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
764          oldServer, throwable, backOffTime, true, null, -1, -1));
765    }
766
767    try {
768      if (backOffTime > 0) {
769        Thread.sleep(backOffTime);
770      }
771    } catch (InterruptedException e) {
772      LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
773      Thread.currentThread().interrupt();
774      return;
775    }
776
777    groupAndSendMultiAction(toReplay, nextAttemptNumber);
778  }
779
780  private void logNoResubmit(ServerName oldServer, int numAttempt,
781                             int failureCount, Throwable throwable, int failed, int stopped) {
782    if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) {
783      String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
784      String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
785          throwable, -1, false, timeStr, failed, stopped);
786      if (failed != 0) {
787        // Only log final failures as warning
788        LOG.warn(logMessage);
789      } else {
790        LOG.info(logMessage);
791      }
792    }
793  }
794
795  /**
796   * Called when we receive the result of a server query.
797   *
798   * @param multiAction    - the multiAction we sent
799   * @param server       - the location. It's used as a server name.
800   * @param responses      - the response, if any
801   * @param numAttempt     - the attempt
802   */
803  private void receiveMultiAction(MultiAction multiAction, ServerName server,
804      MultiResponse responses, int numAttempt) {
805    assert responses != null;
806    updateStats(server, responses);
807    // Success or partial success
808    // Analyze detailed results. We can still have individual failures to be redo.
809    // two specific throwables are managed:
810    //  - DoNotRetryIOException: we continue to retry for other actions
811    //  - RegionMovedException: we update the cache with the new region location
812    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
813    List<Action> toReplay = new ArrayList<>();
814    Throwable lastException = null;
815    int failureCount = 0;
816    int failed = 0;
817    int stopped = 0;
818    Retry retry = null;
819    // Go by original action.
820    for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
821      byte[] regionName = regionEntry.getKey();
822
823      Throwable regionException = responses.getExceptions().get(regionName);
824      if (regionException != null) {
825        cleanServerCache(server, regionException);
826      }
827
828      Map<Integer, Object> regionResults =
829        results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
830      boolean regionFailureRegistered = false;
831      for (Action sentAction : regionEntry.getValue()) {
832        Object result = regionResults.get(sentAction.getOriginalIndex());
833        if (result == null) {
834          if (regionException == null) {
835            LOG.error("Server sent us neither results nor exceptions for "
836              + Bytes.toStringBinary(regionName)
837              + ", numAttempt:" + numAttempt);
838            regionException = new RuntimeException("Invalid response");
839          }
840          // If the row operation encounters the region-lever error, the exception of action may be
841          // null.
842          result = regionException;
843        }
844        // Failure: retry if it's make sense else update the errors lists
845        if (result instanceof Throwable) {
846          Throwable actionException = (Throwable) result;
847          Row row = sentAction.getAction();
848          lastException = regionException != null ? regionException
849            : ClientExceptionsUtil.findException(actionException);
850          // Register corresponding failures once per server/once per region.
851          if (!regionFailureRegistered) {
852            regionFailureRegistered = true;
853            updateCachedLocations(server, regionName, row.getRow(), actionException);
854          }
855          if (retry == null) {
856            errorsByServer.reportServerError(server);
857            // We determine canRetry only once for all calls, after reporting server failure.
858            retry = errorsByServer.canTryMore(numAttempt) ?
859              Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
860          }
861          ++failureCount;
862          switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
863            server)) {
864            case YES:
865              toReplay.add(sentAction);
866              break;
867            case NO_OTHER_SUCCEEDED:
868              ++stopped;
869              break;
870            default:
871              ++failed;
872              break;
873          }
874        } else {
875          invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
876          setResult(sentAction, result);
877        }
878      }
879    }
880    if (toReplay.isEmpty()) {
881      logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
882    } else {
883      resubmit(server, toReplay, numAttempt, failureCount, lastException);
884    }
885  }
886
887  private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
888    Throwable rowException) {
889    if (tableName == null) {
890      return;
891    }
892    try {
893      asyncProcess.connection
894        .updateCachedLocations(tableName, regionName, row, rowException, server);
895    } catch (Throwable ex) {
896      // That should never happen, but if it did, we want to make sure
897      // we still process errors
898      LOG.error("Couldn't update cached region locations: " + ex);
899    }
900  }
901
902  private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
903    if (callback != null) {
904      try {
905        //noinspection unchecked
906        // TODO: would callback expect a replica region name if it gets one?
907        this.callback.update(regionName, row, result);
908      } catch (Throwable t) {
909        LOG.error("User callback threw an exception for "
910          + Bytes.toStringBinary(regionName) + ", ignoring", t);
911      }
912    }
913  }
914
915  private void cleanServerCache(ServerName server, Throwable regionException) {
916    if (ClientExceptionsUtil.isMetaClearingException(regionException)) {
917      // We want to make sure to clear the cache in case there were location-related exceptions.
918      // We don't to clear the cache for every possible exception that comes through, however.
919      asyncProcess.connection.clearCaches(server);
920    }
921  }
922
923  @VisibleForTesting
924  protected void updateStats(ServerName server, MultiResponse resp) {
925    ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
926      Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
927  }
928
929
930  private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
931                           Throwable error, long backOffTime, boolean willRetry, String startTime,
932                           int failed, int stopped) {
933    StringBuilder sb = new StringBuilder();
934    sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName).
935        append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries).
936        append(", ");
937
938    if (failureCount > 0 || error != null){
939      sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=").
940          append(error);
941    } else {
942      sb.append("succeeded");
943    }
944
945    sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
946
947    if (willRetry) {
948      sb.append(", retrying after=").append(backOffTime).append("ms").
949          append(", operationsToReplay=").append(replaySize);
950    } else if (failureCount > 0) {
951      if (stopped > 0) {
952        sb.append("; NOT retrying, stopped=").append(stopped).
953            append(" because successful operation on other replica");
954      }
955      if (failed > 0) {
956        sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!");
957      }
958    }
959
960    return sb.toString();
961  }
962
963  /**
964   * Sets the non-error result from a particular action.
965   * @param action Action (request) that the server responded to.
966   * @param result The result.
967   */
968  private void setResult(Action action, Object result) {
969    if (result == null) {
970      throw new RuntimeException("Result cannot be null");
971    }
972    ReplicaResultState state = null;
973    boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
974    int index = action.getOriginalIndex();
975    if (results == null) {
976      decActionCounter(index);
977      return; // Simple case, no replica requests.
978    }
979    state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
980    if (state == null) {
981      return; // Simple case, no replica requests.
982    }
983    // At this point we know that state is set to replica tracking class.
984    // It could be that someone else is also looking at it; however, we know there can
985    // only be one state object, and only one thread can set callCount to 0. Other threads
986    // will either see state with callCount 0 after locking it; or will not see state at all
987    // we will replace it with the result.
988    synchronized (state) {
989      if (state.callCount == 0) {
990        return; // someone already set the result
991      }
992      state.callCount = 0;
993    }
994    synchronized (replicaResultLock) {
995      if (results[index] != state) {
996        throw new AssertionError("We set the callCount but someone else replaced the result");
997      }
998      updateResult(index, result);
999    }
1000
1001    decActionCounter(index);
1002  }
1003
1004  /**
1005   * Sets the error from a particular action.
1006   * @param index Original action index.
1007   * @param row Original request.
1008   * @param throwable The resulting error.
1009   * @param server The source server.
1010   */
1011  private void setError(int index, Row row, Throwable throwable, ServerName server) {
1012    ReplicaResultState state = null;
1013    if (results == null) {
1014      // Note that we currently cannot have replica requests with null results. So it shouldn't
1015      // happen that multiple replica calls will call dAC for same actions with results == null.
1016      // Only one call per action should be present in this case.
1017      errors.add(throwable, row, server);
1018      decActionCounter(index);
1019      return; // Simple case, no replica requests.
1020    }
1021    state = trySetResultSimple(index, row, true, throwable, server, false);
1022    if (state == null) {
1023      return; // Simple case, no replica requests.
1024    }
1025    BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1026    boolean isActionDone = false;
1027    synchronized (state) {
1028      switch (state.callCount) {
1029        case 0: return; // someone already set the result
1030        case 1: { // All calls failed, we are the last error.
1031          target = errors;
1032          isActionDone = true;
1033          break;
1034        }
1035        default: {
1036          assert state.callCount > 1;
1037          if (state.replicaErrors == null) {
1038            state.replicaErrors = new BatchErrors();
1039          }
1040          target = state.replicaErrors;
1041          break;
1042        }
1043      }
1044      --state.callCount;
1045    }
1046    target.add(throwable, row, server);
1047    if (isActionDone) {
1048      if (state.replicaErrors != null) { // last call, no need to lock
1049        errors.merge(state.replicaErrors);
1050      }
1051      // See setResult for explanations.
1052      synchronized (replicaResultLock) {
1053        if (results[index] != state) {
1054          throw new AssertionError("We set the callCount but someone else replaced the result");
1055        }
1056        updateResult(index, throwable);
1057      }
1058      decActionCounter(index);
1059    }
1060  }
1061
1062  /**
1063   * Checks if the action is complete; used on error to prevent needless retries.
1064   * Does not synchronize, assuming element index/field accesses are atomic.
1065   * This is an opportunistic optimization check, doesn't have to be strict.
1066   * @param index Original action index.
1067   * @param row Original request.
1068   */
1069  private boolean isActionComplete(int index, Row row) {
1070    if (!AsyncProcess.isReplicaGet(row)) return false;
1071    Object resObj = results[index];
1072    return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1073        || ((ReplicaResultState)resObj).callCount == 0);
1074  }
1075
1076  /**
1077   * Tries to set the result or error for a particular action as if there were no replica calls.
1078   * @return null if successful; replica state if there were in fact replica calls.
1079   */
1080  private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1081                                                Object result, ServerName server, boolean isFromReplica) {
1082    Object resObj = null;
1083    if (!AsyncProcess.isReplicaGet(row)) {
1084      if (isFromReplica) {
1085        throw new AssertionError("Unexpected stale result for " + row);
1086      }
1087      updateResult(index, result);
1088    } else {
1089      synchronized (replicaResultLock) {
1090        resObj = results[index];
1091        if (resObj == null) {
1092          if (isFromReplica) {
1093            throw new AssertionError("Unexpected stale result for " + row);
1094          }
1095          updateResult(index, result);
1096        }
1097      }
1098    }
1099
1100    ReplicaResultState rrs =
1101        (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1102    if (rrs == null && isError) {
1103      // The resObj is not replica state (null or already set).
1104      errors.add((Throwable)result, row, server);
1105    }
1106
1107    if (resObj == null) {
1108      // resObj is null - no replica calls were made.
1109      decActionCounter(index);
1110      return null;
1111    }
1112    return rrs;
1113  }
1114
1115  private void decActionCounter(int index) {
1116    long actionsRemaining = actionsInProgress.decrementAndGet();
1117    if (actionsRemaining < 0) {
1118      String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1119      throw new AssertionError(error);
1120    } else if (actionsRemaining == 0) {
1121      synchronized (actionsInProgress) {
1122        actionsInProgress.notifyAll();
1123      }
1124    }
1125  }
1126
1127  private String buildDetailedErrorMsg(String string, int index) {
1128    StringBuilder error = new StringBuilder(128);
1129    error.append(string).append("; called for ").append(index).append(", actionsInProgress ")
1130        .append(actionsInProgress.get()).append("; replica gets: ");
1131    if (replicaGetIndices != null) {
1132      for (int i = 0; i < replicaGetIndices.length; ++i) {
1133        error.append(replicaGetIndices[i]).append(", ");
1134      }
1135    } else {
1136      error.append(hasAnyReplicaGets ? "all" : "none");
1137    }
1138    error.append("; results ");
1139    if (results != null) {
1140      for (int i = 0; i < results.length; ++i) {
1141        Object o = results[i];
1142        error.append(((o == null) ? "null" : o.toString())).append(", ");
1143      }
1144    }
1145    return error.toString();
1146  }
1147
1148  @Override
1149  public void waitUntilDone() throws InterruptedIOException {
1150    try {
1151      waitUntilDone(Long.MAX_VALUE);
1152    } catch (InterruptedException iex) {
1153      throw new InterruptedIOException(iex.getMessage());
1154    } finally {
1155      if (callsInProgress != null) {
1156        for (CancellableRegionServerCallable clb : callsInProgress) {
1157          clb.cancel();
1158        }
1159      }
1160    }
1161  }
1162
1163  private boolean waitUntilDone(long cutoff) throws InterruptedException {
1164    boolean hasWait = cutoff != Long.MAX_VALUE;
1165    long lastLog = EnvironmentEdgeManager.currentTime();
1166    long currentInProgress;
1167    while (0 != (currentInProgress = actionsInProgress.get())) {
1168      long now = EnvironmentEdgeManager.currentTime();
1169      if (hasWait && (now * 1000L) > cutoff) {
1170        return false;
1171      }
1172      if (!hasWait) { // Only log if wait is infinite.
1173        if (now > lastLog + 10000) {
1174          lastLog = now;
1175          LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
1176              + "  actions to finish on table: " + tableName);
1177        }
1178      }
1179      synchronized (actionsInProgress) {
1180        if (actionsInProgress.get() == 0) break;
1181        if (!hasWait) {
1182          actionsInProgress.wait(10);
1183        } else {
1184          long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1185          TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1186        }
1187      }
1188    }
1189    return true;
1190  }
1191
1192  @Override
1193  public boolean hasError() {
1194    return errors.hasErrors();
1195  }
1196
1197  @Override
1198  public List<? extends Row> getFailedOperations() {
1199    return errors.actions;
1200  }
1201
1202  @Override
1203  public RetriesExhaustedWithDetailsException getErrors() {
1204    return errors.makeException(asyncProcess.logBatchErrorDetails);
1205  }
1206
1207  @Override
1208  public Object[] getResults() throws InterruptedIOException {
1209    waitUntilDone();
1210    return results;
1211  }
1212
1213  /**
1214   * Creates the server error tracker to use inside process.
1215   * Currently, to preserve the main assumption about current retries, and to work well with
1216   * the retry-limit-based calculation, the calculation is local per Process object.
1217   * We may benefit from connection-wide tracking of server errors.
1218   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1219   */
1220  private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
1221    return new ConnectionImplementation.ServerErrorTracker(
1222        asyncProcess.serverTrackerTimeout, asyncProcess.numTries);
1223  }
1224
1225  /**
1226   * Create a callable. Isolated to be easily overridden in the tests.
1227   */
1228  private MultiServerCallable createCallable(final ServerName server, TableName tableName,
1229      final MultiAction multi) {
1230    return new MultiServerCallable(asyncProcess.connection, tableName, server,
1231        multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
1232  }
1233
1234  private void updateResult(int index, Object result) {
1235    Object current = results[index];
1236    if (current != null) {
1237      if (LOG.isDebugEnabled()) {
1238        LOG.debug("The result is assigned repeatedly! current:" + current
1239          + ", new:" + result);
1240      }
1241    }
1242    results[index] = result;
1243  }
1244
1245  @VisibleForTesting
1246  long getNumberOfActionsInProgress() {
1247    return actionsInProgress.get();
1248  }
1249}