@InterfaceAudience.Private class AsyncProcess extends Object
The caller sends a buffer of operation, by calling submit. This class extract from this list the operations it can send, i.e. the operations that are on region that are not considered as busy. The process is asynchronous, i.e. it returns immediately when if has finished to iterate on the list. If, and only if, the maximum number of current task is reached, the call to submit will block. Alternatively, the caller can call submitAll, in which case all the operations will be sent. Each call to submit returns a future-like object that can be used to track operation progress.
The class manages internally the retries.
The class can be constructed in regular mode, or "global error" mode. In global error mode, AP tracks errors across all calls (each "future" also has global view of all errors). That mode is necessary for backward compat with HTable behavior, where multiple submissions are made and the errors can propagate using any put/flush call, from previous calls. In "regular" mode, the errors are tracked inside the Future object that is returned. The results are always tracked inside the Future object and can be retrieved when the call has finished. Partial results can also be retrieved if some part of multi-request failed.
This class is thread safe in regular mode; in global error code, submitting operations and retrieving errors from different threads may be not thread safe. Internally, the class is thread safe enough to manage simultaneously new submission and results arising from older operations.
Internally, this class works with Row
, this mean it could be theoretically used for
gets as well.
Modifier and Type | Class and Description |
---|---|
static interface |
AsyncProcess.AsyncRequestFuture
The context used to wait for results from one submit call.
|
protected class |
AsyncProcess.AsyncRequestFutureImpl<CResult>
The context, and return value, for a single submit/submitAll call.
|
protected static class |
AsyncProcess.BatchErrors |
private static class |
AsyncProcess.ReplicaResultState
Sync point for calls to multiple replicas for the same user request (Get).
|
private static class |
AsyncProcess.Retry
For manageError.
|
Modifier and Type | Field and Description |
---|---|
protected ClusterConnection |
connection |
protected static AtomicLong |
COUNTER |
static int |
DEFAULT_START_LOG_ERRORS_AFTER_COUNT |
private static int |
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS |
protected AsyncProcess.BatchErrors |
globalErrors |
protected long |
id |
private static org.apache.commons.logging.Log |
LOG |
static String |
LOG_DETAILS_FOR_BATCH_ERROR
Configuration to decide whether to log details for batch error
|
protected int |
maxConcurrentTasksPerRegion
The number of tasks we run in parallel on a single region.
|
protected int |
maxConcurrentTasksPerServer
The number of task simultaneously executed on a single region server.
|
protected int |
maxTotalConcurrentTasks
The number of tasks simultaneously executed on the cluster.
|
private static AsyncProcess.AsyncRequestFuture |
NO_REQS_RESULT
Return value from a submit that didn't contain any requests.
|
protected int |
numTries |
protected long |
pause |
protected ExecutorService |
pool |
static String |
PRIMARY_CALL_TIMEOUT_KEY |
protected long |
primaryCallTimeoutMicroseconds |
protected RpcRetryingCallerFactory |
rpcCallerFactory |
protected RpcControllerFactory |
rpcFactory |
protected int |
serverTrackerTimeout |
static String |
START_LOG_ERRORS_AFTER_COUNT_KEY
Configure the number of failures after which the client will start logging.
|
private int |
startLogErrorsCnt |
protected ConcurrentMap<byte[],AtomicInteger> |
taskCounterPerRegion |
protected ConcurrentMap<ServerName,AtomicInteger> |
taskCounterPerServer |
protected AtomicLong |
tasksInProgress |
private int |
THRESHOLD_TO_LOG_REGION_DETAILS |
private static String |
THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS |
private int |
thresholdToLogUndoneTaskDetails |
protected int |
timeout |
Constructor and Description |
---|
AsyncProcess(ClusterConnection hc,
org.apache.hadoop.conf.Configuration conf,
ExecutorService pool,
RpcRetryingCallerFactory rpcCaller,
boolean useGlobalErrors,
RpcControllerFactory rpcFactory) |
Modifier and Type | Method and Description |
---|---|
private static void |
addAction(ServerName server,
byte[] regionName,
Action<Row> action,
Map<ServerName,MultiAction<Row>> actionsByServer,
long nonceGroup)
Helper that is used when grouping the actions per region server.
|
protected boolean |
canTakeOperation(HRegionLocation loc,
Map<HRegionInfo,Boolean> regionsIncluded,
Map<ServerName,Boolean> serversIncluded)
Check if we should send new operations to this region or region server.
|
protected <CResult> AsyncProcess.AsyncRequestFutureImpl<CResult> |
createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions,
long nonceGroup,
ExecutorService pool,
Batch.Callback<CResult> callback,
Object[] results,
boolean needResults) |
protected MultiServerCallable<Row> |
createCallable(ServerName server,
TableName tableName,
MultiAction<Row> multi)
Create a callable.
|
protected RpcRetryingCaller<MultiResponse> |
createCaller(MultiServerCallable<Row> callable)
Create a caller.
|
protected ConnectionManager.ServerErrorTracker |
createServerErrorTracker()
Creates the server error tracker to use inside process.
|
protected void |
decTaskCounters(Collection<byte[]> regions,
ServerName sn)
Decrements the counters for a given region and the region server.
|
private ExecutorService |
getPool(ExecutorService pool) |
boolean |
hasError()
Only used w/useGlobalErrors ctor argument, for HTable backward compat.
|
protected void |
incTaskCounters(Collection<byte[]> regions,
ServerName sn)
increment the tasks counters for a given set of regions.
|
private static boolean |
isReplicaGet(Row row) |
private void |
logDetailsOfUndoneTasks(long taskInProgress) |
private static void |
setNonce(NonceGenerator ng,
Row r,
Action<Row> action) |
<CResult> AsyncProcess.AsyncRequestFuture |
submit(ExecutorService pool,
TableName tableName,
List<? extends Row> rows,
boolean atLeastOne,
Batch.Callback<CResult> callback,
boolean needResults)
Extract from the rows list what we can submit.
|
<CResult> AsyncProcess.AsyncRequestFuture |
submit(TableName tableName,
List<? extends Row> rows,
boolean atLeastOne,
Batch.Callback<CResult> callback,
boolean needResults)
|
<CResult> AsyncProcess.AsyncRequestFuture |
submitAll(ExecutorService pool,
TableName tableName,
List<? extends Row> rows,
Batch.Callback<CResult> callback,
Object[] results)
Submit immediately the list of rows, whatever the server status.
|
<CResult> AsyncProcess.AsyncRequestFuture |
submitAll(TableName tableName,
List<? extends Row> rows,
Batch.Callback<CResult> callback,
Object[] results)
|
(package private) <CResult> AsyncProcess.AsyncRequestFuture |
submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions,
long nonceGroup,
Batch.Callback<CResult> callback,
Object[] results,
boolean needResults,
List<Exception> locationErrors,
List<Integer> locationErrorRows,
Map<ServerName,MultiAction<Row>> actionsByServer,
ExecutorService pool) |
RetriesExhaustedWithDetailsException |
waitForAllPreviousOpsAndReset(List<Row> failedRows,
String tableName)
Only used w/useGlobalErrors ctor argument, for HTable backward compat.
|
(package private) void |
waitForMaximumCurrentTasks(int max,
AtomicLong tasksInProgress,
long id,
String tableName) |
private void |
waitForMaximumCurrentTasks(int max,
String tableName)
Wait until the async does not have more than max tasks in progress.
|
(package private) void |
waitUntilDone() |
private static final org.apache.commons.logging.Log LOG
protected static final AtomicLong COUNTER
public static final String PRIMARY_CALL_TIMEOUT_KEY
public static final String START_LOG_ERRORS_AFTER_COUNT_KEY
public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT
public static final String LOG_DETAILS_FOR_BATCH_ERROR
private final int thresholdToLogUndoneTaskDetails
private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS
private final int THRESHOLD_TO_LOG_REGION_DETAILS
private static final AsyncProcess.AsyncRequestFuture NO_REQS_RESULT
protected final long id
protected final ClusterConnection connection
protected final RpcRetryingCallerFactory rpcCallerFactory
protected final RpcControllerFactory rpcFactory
protected final AsyncProcess.BatchErrors globalErrors
protected final ExecutorService pool
protected final AtomicLong tasksInProgress
protected final ConcurrentMap<byte[],AtomicInteger> taskCounterPerRegion
protected final ConcurrentMap<ServerName,AtomicInteger> taskCounterPerServer
private final int startLogErrorsCnt
protected final int maxTotalConcurrentTasks
protected final int maxConcurrentTasksPerRegion
protected final int maxConcurrentTasksPerServer
protected final long pause
protected int numTries
protected int serverTrackerTimeout
protected int timeout
protected long primaryCallTimeoutMicroseconds
public AsyncProcess(ClusterConnection hc, org.apache.hadoop.conf.Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory)
private ExecutorService getPool(ExecutorService pool)
public <CResult> AsyncProcess.AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException
submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)
.
Uses default ExecutorService for this AP (must have been created with one).InterruptedIOException
public <CResult> AsyncProcess.AsyncRequestFuture submit(ExecutorService pool, TableName tableName, List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) throws InterruptedIOException
pool
- ExecutorService to use.tableName
- The table for which this request is needed.callback
- Batch callback. Only called on success (94 behavior).needResults
- Whether results are needed, or can be discarded.rows
- - the submitted row. Modified by the method: we remove the rows we took.atLeastOne
- true if we should submit at least a subset.InterruptedIOException
<CResult> AsyncProcess.AsyncRequestFuture submitMultiActions(TableName tableName, List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, Object[] results, boolean needResults, List<Exception> locationErrors, List<Integer> locationErrorRows, Map<ServerName,MultiAction<Row>> actionsByServer, ExecutorService pool)
private static void addAction(ServerName server, byte[] regionName, Action<Row> action, Map<ServerName,MultiAction<Row>> actionsByServer, long nonceGroup)
loc
- - the destination. Must not be null.action
- - the action to add to the multiactionactionsByServer
- the multiaction per servernonceGroup
- Nonce group.protected boolean canTakeOperation(HRegionLocation loc, Map<HRegionInfo,Boolean> regionsIncluded, Map<ServerName,Boolean> serversIncluded)
loc;
- the region and the server name we want to use.public <CResult> AsyncProcess.AsyncRequestFuture submitAll(TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results)
submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])
.
Uses default ExecutorService for this AP (must have been created with one).public <CResult> AsyncProcess.AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results)
pool
- ExecutorService to use.tableName
- name of the table for which the submission is made.rows
- the list of rows.callback
- the callback.results
- Optional array to return the results thru; backward compat.private static void setNonce(NonceGenerator ng, Row r, Action<Row> action)
protected <CResult> AsyncProcess.AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults)
protected MultiServerCallable<Row> createCallable(ServerName server, TableName tableName, MultiAction<Row> multi)
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable)
void waitUntilDone() throws InterruptedIOException
InterruptedIOException
private void waitForMaximumCurrentTasks(int max, String tableName) throws InterruptedIOException
InterruptedIOException
void waitForMaximumCurrentTasks(int max, AtomicLong tasksInProgress, long id, String tableName) throws InterruptedIOException
InterruptedIOException
private void logDetailsOfUndoneTasks(long taskInProgress)
public boolean hasError()
#waitForAllPreviousOpsAndReset(List)
was called, or AP was created.public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(List<Row> failedRows, String tableName) throws InterruptedIOException
failedRows
- an optional list into which the rows that failed since the last time
#waitForAllPreviousOpsAndReset(List)
was called, or AP was created, are saved.tableName
- name of the table#waitForAllPreviousOpsAndReset(List)
was called, or AP was created.InterruptedIOException
protected void incTaskCounters(Collection<byte[]> regions, ServerName sn)
protected void decTaskCounters(Collection<byte[]> regions, ServerName sn)
protected ConnectionManager.ServerErrorTracker createServerErrorTracker()
private static boolean isReplicaGet(Row row)
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.