001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.util.Iterator;
023import java.util.List;
024import java.util.concurrent.ExecutorService;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.apache.hadoop.hbase.client.coprocessor.Batch;
029
030/**
031 * Contains the attributes of a task which will be executed
032 * by {@link org.apache.hadoop.hbase.client.AsyncProcess}.
033 * The attributes will be validated by AsyncProcess.
034 * It's intended for advanced client applications.
035 * @param <T> The type of response from server-side
036 */
037@InterfaceAudience.Private
038@InterfaceStability.Evolving
039public class AsyncProcessTask<T> {
040  /**
041   * The number of processed rows.
042   * The AsyncProcess has traffic control which may reject some rows.
043   */
044  public enum SubmittedRows {
045    ALL,
046    AT_LEAST_ONE,
047    NORMAL
048  }
049  public static <T> Builder<T> newBuilder(final Batch.Callback<T> callback) {
050    return new Builder<>(callback);
051  }
052  public static Builder newBuilder() {
053    return new Builder();
054  }
055
056  public static class Builder<T> {
057
058    private ExecutorService pool;
059    private TableName tableName;
060    private RowAccess<? extends Row> rows;
061    private SubmittedRows submittedRows = SubmittedRows.ALL;
062    private Batch.Callback<T> callback;
063    private boolean needResults;
064    private int rpcTimeout;
065    private int operationTimeout;
066    private CancellableRegionServerCallable callable;
067    private Object[] results;
068
069    private Builder() {
070    }
071
072    private Builder(Batch.Callback<T> callback) {
073      this.callback = callback;
074    }
075
076    Builder<T> setResults(Object[] results) {
077      this.results = results;
078      if (results != null && results.length != 0) {
079        setNeedResults(true);
080      }
081      return this;
082    }
083
084    public Builder<T> setPool(ExecutorService pool) {
085      this.pool = pool;
086      return this;
087    }
088
089    public Builder<T> setRpcTimeout(int rpcTimeout) {
090      this.rpcTimeout = rpcTimeout;
091      return this;
092    }
093
094    public Builder<T> setOperationTimeout(int operationTimeout) {
095      this.operationTimeout = operationTimeout;
096      return this;
097    }
098
099    public Builder<T> setTableName(TableName tableName) {
100      this.tableName = tableName;
101      return this;
102    }
103
104    public Builder<T> setRowAccess(List<? extends Row> rows) {
105      this.rows = new ListRowAccess<>(rows);
106      return this;
107    }
108
109    public Builder<T> setRowAccess(RowAccess<? extends Row> rows) {
110      this.rows = rows;
111      return this;
112    }
113
114    public Builder<T> setSubmittedRows(SubmittedRows submittedRows) {
115      this.submittedRows = submittedRows;
116      return this;
117    }
118
119    public Builder<T> setNeedResults(boolean needResults) {
120      this.needResults = needResults;
121      return this;
122    }
123
124    Builder<T> setCallable(CancellableRegionServerCallable callable) {
125      this.callable = callable;
126      return this;
127    }
128
129    public AsyncProcessTask<T> build() {
130      return new AsyncProcessTask<>(pool, tableName, rows, submittedRows,
131              callback, callable, needResults, rpcTimeout, operationTimeout, results);
132    }
133  }
134  private final ExecutorService pool;
135  private final TableName tableName;
136  private final RowAccess<? extends Row> rows;
137  private final SubmittedRows submittedRows;
138  private final Batch.Callback<T> callback;
139  private final CancellableRegionServerCallable callable;
140  private final boolean needResults;
141  private final int rpcTimeout;
142  private final int operationTimeout;
143  private final Object[] results;
144  AsyncProcessTask(AsyncProcessTask<T> task) {
145    this(task.getPool(), task.getTableName(), task.getRowAccess(),
146        task.getSubmittedRows(), task.getCallback(), task.getCallable(),
147        task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(),
148        task.getResults());
149  }
150  AsyncProcessTask(ExecutorService pool, TableName tableName,
151          RowAccess<? extends Row> rows, SubmittedRows size, Batch.Callback<T> callback,
152          CancellableRegionServerCallable callable, boolean needResults,
153          int rpcTimeout, int operationTimeout, Object[] results) {
154    this.pool = pool;
155    this.tableName = tableName;
156    this.rows = rows;
157    this.submittedRows = size;
158    this.callback = callback;
159    this.callable = callable;
160    this.needResults = needResults;
161    this.rpcTimeout = rpcTimeout;
162    this.operationTimeout = operationTimeout;
163    this.results = results;
164  }
165
166  public int getOperationTimeout() {
167    return operationTimeout;
168  }
169
170  public ExecutorService getPool() {
171    return pool;
172  }
173
174  public TableName getTableName() {
175    return tableName;
176  }
177
178  public RowAccess<? extends Row> getRowAccess() {
179    return rows;
180  }
181
182  public SubmittedRows getSubmittedRows() {
183    return submittedRows;
184  }
185
186  public Batch.Callback<T> getCallback() {
187    return callback;
188  }
189
190  CancellableRegionServerCallable getCallable() {
191    return callable;
192  }
193
194  Object[] getResults() {
195    return results;
196  }
197
198  public boolean getNeedResults() {
199    return needResults;
200  }
201
202  public int getRpcTimeout() {
203    return rpcTimeout;
204  }
205
206  static class ListRowAccess<T> implements RowAccess<T> {
207
208    private final List<T> data;
209
210    ListRowAccess(final List<T> data) {
211      this.data = data;
212    }
213
214    @Override
215    public int size() {
216      return data.size();
217    }
218
219    @Override
220    public boolean isEmpty() {
221      return data.isEmpty();
222    }
223
224    @Override
225    public Iterator<T> iterator() {
226      return data.iterator();
227    }
228  }
229}