001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.replication.SyncReplicationState;
090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
092import org.apache.hadoop.hbase.security.access.Permission;
093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
094import org.apache.hadoop.hbase.security.access.UserPermission;
095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.Strings;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
109import org.apache.hbase.thirdparty.com.google.protobuf.Message;
110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
113import org.apache.hbase.thirdparty.io.netty.util.Timeout;
114import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
116
117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
361import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
362import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
363import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
364
365/**
366 * The implementation of AsyncAdmin.
367 * <p>
368 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
369 * be finished inside the rpc framework thread, which means that the callbacks registered to the
370 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
371 * this class should not try to do time consuming tasks in the callbacks.
372 * @since 2.0.0
373 * @see AsyncHBaseAdmin
374 * @see AsyncConnection#getAdmin()
375 * @see AsyncConnection#getAdminBuilder()
376 */
377@InterfaceAudience.Private
378class RawAsyncHBaseAdmin implements AsyncAdmin {
379
380  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
381
382  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
383
384  private final AsyncConnectionImpl connection;
385
386  private final HashedWheelTimer retryTimer;
387
388  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
389
390  private final long rpcTimeoutNs;
391
392  private final long operationTimeoutNs;
393
394  private final long pauseNs;
395
396  private final long pauseNsForServerOverloaded;
397
398  private final int maxAttempts;
399
400  private final int startLogErrorsCnt;
401
402  private final NonceGenerator ng;
403
404  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
405    AsyncAdminBuilderBase builder) {
406    this.connection = connection;
407    this.retryTimer = retryTimer;
408    this.metaTable = connection.getTable(META_TABLE_NAME);
409    this.rpcTimeoutNs = builder.rpcTimeoutNs;
410    this.operationTimeoutNs = builder.operationTimeoutNs;
411    this.pauseNs = builder.pauseNs;
412    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
413      LOG.warn(
414        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
415          + " the normal pause value {} ms, use the greater one instead",
416        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
417        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
418      this.pauseNsForServerOverloaded = builder.pauseNs;
419    } else {
420      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
421    }
422    this.maxAttempts = builder.maxAttempts;
423    this.startLogErrorsCnt = builder.startLogErrorsCnt;
424    this.ng = connection.getNonceGenerator();
425  }
426
427  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
428    return this.connection.callerFactory.<T> masterRequest()
429      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
430      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
431      .pause(pauseNs, TimeUnit.NANOSECONDS)
432      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
433      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
434  }
435
436  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
437    return this.connection.callerFactory.<T> adminRequest()
438      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
439      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
440      .pause(pauseNs, TimeUnit.NANOSECONDS)
441      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
442      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
443  }
444
445  @FunctionalInterface
446  private interface MasterRpcCall<RESP, REQ> {
447    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
448      RpcCallback<RESP> done);
449  }
450
451  @FunctionalInterface
452  private interface AdminRpcCall<RESP, REQ> {
453    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
454      RpcCallback<RESP> done);
455  }
456
457  @FunctionalInterface
458  private interface Converter<D, S> {
459    D convert(S src) throws IOException;
460  }
461
462  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
463    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
464    Converter<RESP, PRESP> respConverter) {
465    CompletableFuture<RESP> future = new CompletableFuture<>();
466    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
467
468      @Override
469      public void run(PRESP resp) {
470        if (controller.failed()) {
471          future.completeExceptionally(controller.getFailed());
472        } else {
473          try {
474            future.complete(respConverter.convert(resp));
475          } catch (IOException e) {
476            future.completeExceptionally(e);
477          }
478        }
479      }
480    });
481    return future;
482  }
483
484  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
485    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
486    Converter<RESP, PRESP> respConverter) {
487    CompletableFuture<RESP> future = new CompletableFuture<>();
488    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
489
490      @Override
491      public void run(PRESP resp) {
492        if (controller.failed()) {
493          future.completeExceptionally(controller.getFailed());
494        } else {
495          try {
496            future.complete(respConverter.convert(resp));
497          } catch (IOException e) {
498            future.completeExceptionally(e);
499          }
500        }
501      }
502    });
503    return future;
504  }
505
506  /**
507   * short-circuit call for
508   * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)}
509   * by ignoring procedure result
510   */
511  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
512    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
513    ProcedureBiConsumer<Void> consumer) {
514    return procedureCall(preq, rpcCall, respConverter, result -> null, consumer);
515  }
516
517  /**
518   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
519   * ProcedureBiConsumer) by skip setting priority for request
520   */
521  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq,
522    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
523    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
524    return procedureCall(b -> {
525    }, preq, rpcCall, respConverter, resultConverter, consumer);
526  }
527
528  /**
529   * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter,
530   * ProcedureBiConsumer) by ignoring procedure result
531   */
532  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
533    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
534    ProcedureBiConsumer<Void> consumer) {
535    return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer);
536  }
537
538  /**
539   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
540   * ProcedureBiConsumer) by skip setting priority for request
541   */
542  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq,
543    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
544    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
545    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter,
546      consumer);
547  }
548
549  /**
550   * @param <PREQ>          type of request
551   * @param <PRESP>         type of response
552   * @param <PRES>          type of procedure call result
553   * @param prioritySetter  prioritySetter set priority by table for request
554   * @param preq            procedure call request
555   * @param rpcCall         procedure rpc call
556   * @param respConverter   extract proc id from procedure call response
557   * @param resultConverter extract result from procedure call result
558   * @param consumer        action performs on result
559   * @return procedure call result, null if procedure is void
560   */
561  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(
562    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
563    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
564    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
565    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller()
566      .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter));
567    prioritySetter.accept(builder);
568    CompletableFuture<Long> procFuture = builder.call();
569    CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter);
570    addListener(future, consumer);
571    return future;
572  }
573
574  @Override
575  public CompletableFuture<Boolean> tableExists(TableName tableName) {
576    if (TableName.isMetaTableName(tableName)) {
577      return CompletableFuture.completedFuture(true);
578    }
579    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
580  }
581
582  @Override
583  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
584    return getTableDescriptors(
585      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
586  }
587
588  /**
589   * {@link #listTableDescriptors(boolean)}
590   */
591  @Override
592  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
593    boolean includeSysTables) {
594    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
595      + "use listTableDescriptors(boolean) instead");
596    return getTableDescriptors(
597      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
598  }
599
600  @Override
601  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
602    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
603      + "use listTableDescriptors(boolean) instead");
604    if (tableNames.isEmpty()) {
605      return CompletableFuture.completedFuture(Collections.emptyList());
606    }
607    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
608  }
609
610  private CompletableFuture<List<TableDescriptor>>
611    getTableDescriptors(GetTableDescriptorsRequest request) {
612    return this.<List<TableDescriptor>> newMasterCaller()
613      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
614        List<TableDescriptor>> call(controller, stub, request,
615          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
616          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
617      .call();
618  }
619
620  @Override
621  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
622    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
623  }
624
625  @Override
626  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
627    boolean includeSysTables) {
628    Preconditions.checkNotNull(pattern,
629      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
630    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
631  }
632
633  @Override
634  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
635    return this.<List<TableName>> newMasterCaller()
636      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
637        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
638          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
639          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
640          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
641      .call();
642  }
643
644  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
645    return this.<List<TableName>> newMasterCaller()
646      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
647        List<TableName>> call(controller, stub, request,
648          (s, c, req, done) -> s.getTableNames(c, req, done),
649          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
650      .call();
651  }
652
653  @Override
654  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
655    return this.<List<TableDescriptor>> newMasterCaller()
656      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
657        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
658          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
659          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
660          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
661      .call();
662  }
663
664  @Override
665  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
666    return this.<List<TableDescriptor>> newMasterCaller()
667      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
668        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
669          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
670          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
671          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
672      .call();
673  }
674
675  @Override
676  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
677    return this.<List<TableName>> newMasterCaller()
678      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
679        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
680          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
681          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
682          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
683      .call();
684  }
685
686  @Override
687  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
688    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
689    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
690      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
691        List<TableSchema>> call(controller, stub,
692          RequestConverter.buildGetTableDescriptorsRequest(tableName),
693          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
694          (resp) -> resp.getTableSchemaList()))
695      .call(), (tableSchemas, error) -> {
696        if (error != null) {
697          future.completeExceptionally(error);
698          return;
699        }
700        if (!tableSchemas.isEmpty()) {
701          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
702        } else {
703          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
704        }
705      });
706    return future;
707  }
708
709  @Override
710  public CompletableFuture<Void> createTable(TableDescriptor desc) {
711    return createTable(desc.getTableName(),
712      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
713  }
714
715  @Override
716  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
717    int numRegions) {
718    try {
719      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
720    } catch (IllegalArgumentException e) {
721      return failedFuture(e);
722    }
723  }
724
725  @Override
726  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
727    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
728      + " use createTable(TableDescriptor) instead");
729    try {
730      verifySplitKeys(splitKeys);
731      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
732        splitKeys, ng.getNonceGroup(), ng.newNonce()));
733    } catch (IllegalArgumentException e) {
734      return failedFuture(e);
735    }
736  }
737
738  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
739    Preconditions.checkNotNull(tableName, "table name is null");
740    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
741      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
742      new CreateTableProcedureBiConsumer(tableName));
743  }
744
745  @Override
746  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
747    return modifyTable(desc, true);
748  }
749
750  @Override
751  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
752    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
753      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
754        ng.newNonce(), reopenRegions),
755      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
756      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
757  }
758
759  @Override
760  public CompletableFuture<Void> reopenTableRegions(TableName tableName) {
761    return reopenTableRegions(tableName, Collections.emptyList());
762  }
763
764  @Override
765  public CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions) {
766    List<byte[]> regionNames = regions.stream().map(RegionInfo::getRegionName).toList();
767    return this.<ReopenTableRegionsRequest, ReopenTableRegionsResponse> procedureCall(tableName,
768      RequestConverter.buildReopenTableRegionsRequest(tableName, regionNames, ng.getNonceGroup(),
769        ng.newNonce()),
770      (s, c, req, done) -> s.reopenTableRegions(c, req, done), (resp) -> resp.getProcId(),
771      new ReopenTableRegionsProcedureBiConsumer(this, tableName));
772  }
773
774  @Override
775  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
776    return this.<ModifyTableStoreFileTrackerRequest,
777      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
778        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
779          ng.getNonceGroup(), ng.newNonce()),
780        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
781        (resp) -> resp.getProcId(),
782        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
783  }
784
785  @Override
786  public CompletableFuture<Void> deleteTable(TableName tableName) {
787    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
788      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
789      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
790      new DeleteTableProcedureBiConsumer(tableName));
791  }
792
793  @Override
794  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
795    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
796      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
797        ng.newNonce()),
798      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
799      new TruncateTableProcedureBiConsumer(tableName));
800  }
801
802  @Override
803  public CompletableFuture<Void> enableTable(TableName tableName) {
804    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
805      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
806      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
807      new EnableTableProcedureBiConsumer(tableName));
808  }
809
810  @Override
811  public CompletableFuture<Void> disableTable(TableName tableName) {
812    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
813      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
814      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
815      new DisableTableProcedureBiConsumer(tableName));
816  }
817
818  /**
819   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
820   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
821   * targetState).
822   */
823  private static CompletableFuture<Boolean> completeCheckTableState(
824    CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error,
825    TableState.State targetState, TableName tableName) {
826    if (error != null) {
827      future.completeExceptionally(error);
828    } else {
829      if (tableState.isPresent()) {
830        future.complete(tableState.get().inStates(targetState));
831      } else {
832        future.completeExceptionally(new TableNotFoundException(tableName));
833      }
834    }
835    return future;
836  }
837
838  @Override
839  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
840    if (TableName.isMetaTableName(tableName)) {
841      return CompletableFuture.completedFuture(true);
842    }
843    CompletableFuture<Boolean> future = new CompletableFuture<>();
844    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
845      (tableState, error) -> {
846        completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName);
847      });
848    return future;
849  }
850
851  @Override
852  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
853    if (TableName.isMetaTableName(tableName)) {
854      return CompletableFuture.completedFuture(false);
855    }
856    CompletableFuture<Boolean> future = new CompletableFuture<>();
857    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
858      (tableState, error) -> {
859        completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName);
860      });
861    return future;
862  }
863
864  @Override
865  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
866    if (TableName.isMetaTableName(tableName)) {
867      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
868        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
869    }
870    CompletableFuture<Boolean> future = new CompletableFuture<>();
871    addListener(isTableEnabled(tableName), (enabled, error) -> {
872      if (error != null) {
873        if (error instanceof TableNotFoundException) {
874          future.complete(false);
875        } else {
876          future.completeExceptionally(error);
877        }
878        return;
879      }
880      if (!enabled) {
881        future.complete(false);
882      } else {
883        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
884          (locations, error1) -> {
885            if (error1 != null) {
886              future.completeExceptionally(error1);
887              return;
888            }
889            List<HRegionLocation> notDeployedRegions = locations.stream()
890              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
891            if (notDeployedRegions.size() > 0) {
892              if (LOG.isDebugEnabled()) {
893                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
894              }
895              future.complete(false);
896              return;
897            }
898            future.complete(true);
899          });
900      }
901    });
902    return future;
903  }
904
905  @Override
906  public CompletableFuture<Void> addColumnFamily(TableName tableName,
907    ColumnFamilyDescriptor columnFamily) {
908    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
909      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
910        ng.newNonce()),
911      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
912      new AddColumnFamilyProcedureBiConsumer(tableName));
913  }
914
915  @Override
916  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
917    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
918      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
919        ng.newNonce()),
920      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
921      new DeleteColumnFamilyProcedureBiConsumer(tableName));
922  }
923
924  @Override
925  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
926    ColumnFamilyDescriptor columnFamily) {
927    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
928      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
929        ng.newNonce()),
930      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
931      new ModifyColumnFamilyProcedureBiConsumer(tableName));
932  }
933
934  @Override
935  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
936    byte[] family, String dstSFT) {
937    return this.<ModifyColumnStoreFileTrackerRequest,
938      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
939        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
940          ng.getNonceGroup(), ng.newNonce()),
941        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
942        (resp) -> resp.getProcId(),
943        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
944  }
945
946  @Override
947  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
948    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
949      RequestConverter.buildCreateNamespaceRequest(descriptor),
950      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
951      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
952  }
953
954  @Override
955  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
956    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
957      RequestConverter.buildModifyNamespaceRequest(descriptor),
958      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
959      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
960  }
961
962  @Override
963  public CompletableFuture<Void> deleteNamespace(String name) {
964    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
965      RequestConverter.buildDeleteNamespaceRequest(name),
966      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
967      new DeleteNamespaceProcedureBiConsumer(name));
968  }
969
970  @Override
971  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
972    return this.<NamespaceDescriptor> newMasterCaller()
973      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
974        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
975          RequestConverter.buildGetNamespaceDescriptorRequest(name),
976          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
977          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
978      .call();
979  }
980
981  @Override
982  public CompletableFuture<List<String>> listNamespaces() {
983    return this.<List<String>> newMasterCaller()
984      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
985        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
986          (s, c, req, done) -> s.listNamespaces(c, req, done),
987          (resp) -> resp.getNamespaceNameList()))
988      .call();
989  }
990
991  @Override
992  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
993    return this.<List<NamespaceDescriptor>> newMasterCaller()
994      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
995        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
996          ListNamespaceDescriptorsRequest.newBuilder().build(),
997          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
998          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
999      .call();
1000  }
1001
1002  @Override
1003  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
1004    return this.<List<RegionInfo>> newAdminCaller()
1005      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
1006        List<RegionInfo>> adminCall(controller, stub,
1007          RequestConverter.buildGetOnlineRegionRequest(),
1008          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
1009          resp -> ProtobufUtil.getRegionInfos(resp)))
1010      .serverName(serverName).call();
1011  }
1012
1013  @Override
1014  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
1015    if (tableName.equals(META_TABLE_NAME)) {
1016      return connection.registry.getMetaRegionLocations()
1017        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
1018          .collect(Collectors.toList()));
1019    } else {
1020      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
1021        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
1022    }
1023  }
1024
1025  @Override
1026  public CompletableFuture<Void> flush(TableName tableName) {
1027    return flush(tableName, Collections.emptyList());
1028  }
1029
1030  @Override
1031  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
1032    Preconditions.checkNotNull(columnFamily,
1033      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1034    return flush(tableName, Collections.singletonList(columnFamily));
1035  }
1036
1037  @Override
1038  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
1039    // This is for keeping compatibility with old implementation.
1040    // If the server version is lower than the client version, it's possible that the
1041    // flushTable method is not present in the server side, if so, we need to fall back
1042    // to the old implementation.
1043    Preconditions.checkNotNull(columnFamilyList,
1044      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1045    List<byte[]> columnFamilies = columnFamilyList.stream()
1046      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
1047    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
1048      ng.getNonceGroup(), ng.newNonce());
1049    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
1050      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
1051      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
1052    CompletableFuture<Void> future = new CompletableFuture<>();
1053    addListener(procFuture, (ret, error) -> {
1054      if (error != null) {
1055        if (
1056          error instanceof TableNotFoundException || error instanceof TableNotEnabledException
1057            || error instanceof NoSuchColumnFamilyException
1058        ) {
1059          future.completeExceptionally(error);
1060        } else if (error instanceof DoNotRetryIOException) {
1061          // usually this is caused by the method is not present on the server or
1062          // the hbase hadoop version does not match the running hadoop version.
1063          // if that happens, we need fall back to the old flush implementation.
1064          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
1065          legacyFlush(future, tableName, columnFamilies);
1066        } else {
1067          future.completeExceptionally(error);
1068        }
1069      } else {
1070        future.complete(ret);
1071      }
1072    });
1073    return future;
1074  }
1075
1076  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1077    List<byte[]> columnFamilies) {
1078    addListener(tableExists(tableName), (exists, err) -> {
1079      if (err != null) {
1080        future.completeExceptionally(err);
1081      } else if (!exists) {
1082        future.completeExceptionally(new TableNotFoundException(tableName));
1083      } else {
1084        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1085          if (err2 != null) {
1086            future.completeExceptionally(err2);
1087          } else if (!tableEnabled) {
1088            future.completeExceptionally(new TableNotEnabledException(tableName));
1089          } else {
1090            Map<String, String> props = new HashMap<>();
1091            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1092              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1093                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1094            }
1095            addListener(
1096              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1097              (ret, err3) -> {
1098                if (err3 != null) {
1099                  future.completeExceptionally(err3);
1100                } else {
1101                  future.complete(ret);
1102                }
1103              });
1104          }
1105        });
1106      }
1107    });
1108  }
1109
1110  @Override
1111  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1112    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1113    });
1114  }
1115
1116  @Override
1117  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1118    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1119      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1120    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1121    });
1122  }
1123
1124  /**
1125   * This method is for internal use only, where we need the response of the flush.
1126   * <p/>
1127   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1128   * API.
1129   */
1130  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1131    boolean writeFlushWALMarker) {
1132    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1133    addListener(getRegionLocation(regionName), (location, err) -> {
1134      if (err != null) {
1135        future.completeExceptionally(err);
1136        return;
1137      }
1138      ServerName serverName = location.getServerName();
1139      if (serverName == null) {
1140        future
1141          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1142        return;
1143      }
1144      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1145        (ret, err2) -> {
1146          if (err2 != null) {
1147            future.completeExceptionally(err2);
1148          } else {
1149            future.complete(ret);
1150          }
1151        });
1152    });
1153    return future;
1154  }
1155
1156  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1157    byte[] columnFamily, boolean writeFlushWALMarker) {
1158    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1159      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1160        FlushRegionResponse> adminCall(controller, stub,
1161          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1162            writeFlushWALMarker),
1163          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1164      .call();
1165  }
1166
1167  @Override
1168  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1169    CompletableFuture<Void> future = new CompletableFuture<>();
1170    addListener(getRegions(sn), (hRegionInfos, err) -> {
1171      if (err != null) {
1172        future.completeExceptionally(err);
1173        return;
1174      }
1175      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1176      if (hRegionInfos != null) {
1177        hRegionInfos
1178          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1179          })));
1180      }
1181      addListener(CompletableFuture.allOf(
1182        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1183          if (err2 != null) {
1184            future.completeExceptionally(err2);
1185          } else {
1186            future.complete(ret);
1187          }
1188        });
1189    });
1190    return future;
1191  }
1192
1193  @Override
1194  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1195    return compact(tableName, null, false, compactType);
1196  }
1197
1198  @Override
1199  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1200    CompactType compactType) {
1201    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1202      + "If you don't specify a columnFamily, use compact(TableName) instead");
1203    return compact(tableName, columnFamily, false, compactType);
1204  }
1205
1206  @Override
1207  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1208    return compactRegion(regionName, null, false);
1209  }
1210
1211  @Override
1212  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1213    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1214      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1215    return compactRegion(regionName, columnFamily, false);
1216  }
1217
1218  @Override
1219  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1220    return compact(tableName, null, true, compactType);
1221  }
1222
1223  @Override
1224  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1225    CompactType compactType) {
1226    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1227      + "If you don't specify a columnFamily, use compact(TableName) instead");
1228    return compact(tableName, columnFamily, true, compactType);
1229  }
1230
1231  @Override
1232  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1233    return compactRegion(regionName, null, true);
1234  }
1235
1236  @Override
1237  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1238    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1239      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1240    return compactRegion(regionName, columnFamily, true);
1241  }
1242
1243  @Override
1244  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1245    return compactRegionServer(sn, false);
1246  }
1247
1248  @Override
1249  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1250    return compactRegionServer(sn, true);
1251  }
1252
1253  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1254    CompletableFuture<Void> future = new CompletableFuture<>();
1255    addListener(getRegions(sn), (hRegionInfos, err) -> {
1256      if (err != null) {
1257        future.completeExceptionally(err);
1258        return;
1259      }
1260      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1261      if (hRegionInfos != null) {
1262        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1263      }
1264      addListener(CompletableFuture.allOf(
1265        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1266          if (err2 != null) {
1267            future.completeExceptionally(err2);
1268          } else {
1269            future.complete(ret);
1270          }
1271        });
1272    });
1273    return future;
1274  }
1275
1276  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1277    boolean major) {
1278    CompletableFuture<Void> future = new CompletableFuture<>();
1279    addListener(getRegionLocation(regionName), (location, err) -> {
1280      if (err != null) {
1281        future.completeExceptionally(err);
1282        return;
1283      }
1284      ServerName serverName = location.getServerName();
1285      if (serverName == null) {
1286        future
1287          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1288        return;
1289      }
1290      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1291        (ret, err2) -> {
1292          if (err2 != null) {
1293            future.completeExceptionally(err2);
1294          } else {
1295            future.complete(ret);
1296          }
1297        });
1298    });
1299    return future;
1300  }
1301
1302  /**
1303   * List all region locations for the specific table.
1304   */
1305  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1306    if (TableName.META_TABLE_NAME.equals(tableName)) {
1307      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1308      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1309        if (err != null) {
1310          future.completeExceptionally(err);
1311        } else if (
1312          metaRegions == null || metaRegions.isEmpty()
1313            || metaRegions.getDefaultRegionLocation() == null
1314        ) {
1315          future.completeExceptionally(new IOException("meta region does not found"));
1316        } else {
1317          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1318        }
1319      });
1320      return future;
1321    } else {
1322      // For non-meta table, we fetch all locations by scanning hbase:meta table
1323      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1324    }
1325  }
1326
1327  /**
1328   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1329   */
1330  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1331    CompactType compactType) {
1332    CompletableFuture<Void> future = new CompletableFuture<>();
1333
1334    switch (compactType) {
1335      case MOB:
1336        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1337          if (err != null) {
1338            future.completeExceptionally(err);
1339            return;
1340          }
1341          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1342          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1343            if (err2 != null) {
1344              future.completeExceptionally(err2);
1345            } else {
1346              future.complete(ret);
1347            }
1348          });
1349        });
1350        break;
1351      case NORMAL:
1352        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1353          if (err != null) {
1354            future.completeExceptionally(err);
1355            return;
1356          }
1357          if (locations == null || locations.isEmpty()) {
1358            future.completeExceptionally(new TableNotFoundException(tableName));
1359            return;
1360          }
1361          CompletableFuture<?>[] compactFutures =
1362            locations.stream().filter(l -> l.getRegion() != null)
1363              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1364              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1365              .toArray(CompletableFuture<?>[]::new);
1366          // future complete unless all of the compact futures are completed.
1367          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1368            if (err2 != null) {
1369              future.completeExceptionally(err2);
1370            } else {
1371              future.complete(ret);
1372            }
1373          });
1374        });
1375        break;
1376      default:
1377        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1378    }
1379    return future;
1380  }
1381
1382  /**
1383   * Compact the region at specific region server.
1384   */
1385  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1386    final boolean major, byte[] columnFamily) {
1387    return this.<Void> newAdminCaller().serverName(sn)
1388      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1389        Void> adminCall(controller, stub,
1390          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1391          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1392      .call();
1393  }
1394
1395  private byte[] toEncodeRegionName(byte[] regionName) {
1396    return RegionInfo.isEncodedRegionName(regionName)
1397      ? regionName
1398      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1399  }
1400
1401  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1402    CompletableFuture<TableName> result) {
1403    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1404      if (err != null) {
1405        result.completeExceptionally(err);
1406        return;
1407      }
1408      RegionInfo regionInfo = location.getRegion();
1409      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1410        result.completeExceptionally(
1411          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1412        return;
1413      }
1414      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1415        if (!tableName.get().equals(regionInfo.getTable())) {
1416          // tables of this two region should be same.
1417          result.completeExceptionally(
1418            new IllegalArgumentException("Cannot merge regions from two different tables "
1419              + tableName.get() + " and " + regionInfo.getTable()));
1420        } else {
1421          result.complete(tableName.get());
1422        }
1423      }
1424    });
1425  }
1426
1427  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1428    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1429    CompletableFuture<TableName> future = new CompletableFuture<>();
1430    for (byte[] encodedRegionName : encodedRegionNames) {
1431      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1432    }
1433    return future;
1434  }
1435
1436  @Override
1437  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1438    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1439  }
1440
1441  @Override
1442  public CompletableFuture<Boolean> isMergeEnabled() {
1443    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1444  }
1445
1446  @Override
1447  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1448    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1449  }
1450
1451  @Override
1452  public CompletableFuture<Boolean> isSplitEnabled() {
1453    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1454  }
1455
1456  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1457    MasterSwitchType switchType) {
1458    SetSplitOrMergeEnabledRequest request =
1459      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1460    return this.<Boolean> newMasterCaller()
1461      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1462        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1463          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1464          (resp) -> resp.getPrevValueList().get(0)))
1465      .call();
1466  }
1467
1468  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1469    IsSplitOrMergeEnabledRequest request =
1470      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1471    return this.<Boolean> newMasterCaller()
1472      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1473        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1474          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1475      .call();
1476  }
1477
1478  @Override
1479  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1480    if (nameOfRegionsToMerge.size() < 2) {
1481      return failedFuture(new IllegalArgumentException(
1482        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1483    }
1484    CompletableFuture<Void> future = new CompletableFuture<>();
1485    byte[][] encodedNameOfRegionsToMerge =
1486      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1487
1488    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1489      if (err != null) {
1490        future.completeExceptionally(err);
1491        return;
1492      }
1493
1494      final MergeTableRegionsRequest request;
1495      try {
1496        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1497          forcible, ng.getNonceGroup(), ng.newNonce());
1498      } catch (DeserializationException e) {
1499        future.completeExceptionally(e);
1500        return;
1501      }
1502
1503      addListener(
1504        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1505          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1506        (ret, err2) -> {
1507          if (err2 != null) {
1508            future.completeExceptionally(err2);
1509          } else {
1510            future.complete(ret);
1511          }
1512        });
1513    });
1514    return future;
1515  }
1516
1517  @Override
1518  public CompletableFuture<Void> split(TableName tableName) {
1519    CompletableFuture<Void> future = new CompletableFuture<>();
1520    addListener(tableExists(tableName), (exist, error) -> {
1521      if (error != null) {
1522        future.completeExceptionally(error);
1523        return;
1524      }
1525      if (!exist) {
1526        future.completeExceptionally(new TableNotFoundException(tableName));
1527        return;
1528      }
1529      addListener(metaTable
1530        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1531          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1532            ClientMetaTableAccessor.QueryType.REGION))
1533          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1534            ClientMetaTableAccessor.QueryType.REGION))),
1535        (results, err2) -> {
1536          if (err2 != null) {
1537            future.completeExceptionally(err2);
1538            return;
1539          }
1540          if (results != null && !results.isEmpty()) {
1541            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1542            for (Result r : results) {
1543              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1544                continue;
1545              }
1546              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1547              if (rl != null) {
1548                for (HRegionLocation h : rl.getRegionLocations()) {
1549                  if (h != null && h.getServerName() != null) {
1550                    RegionInfo hri = h.getRegion();
1551                    if (
1552                      hri == null || hri.isSplitParent()
1553                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1554                    ) {
1555                      continue;
1556                    }
1557                    splitFutures.add(split(hri, null));
1558                  }
1559                }
1560              }
1561            }
1562            addListener(
1563              CompletableFuture
1564                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1565              (ret, exception) -> {
1566                if (exception != null) {
1567                  future.completeExceptionally(exception);
1568                  return;
1569                }
1570                future.complete(ret);
1571              });
1572          } else {
1573            future.complete(null);
1574          }
1575        });
1576    });
1577    return future;
1578  }
1579
1580  @Override
1581  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1582    CompletableFuture<Void> result = new CompletableFuture<>();
1583    if (splitPoint == null) {
1584      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1585    }
1586    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1587      (loc, err) -> {
1588        if (err != null) {
1589          result.completeExceptionally(err);
1590        } else if (loc == null || loc.getRegion() == null) {
1591          result.completeExceptionally(new IllegalArgumentException(
1592            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1593        } else {
1594          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1595            if (err2 != null) {
1596              result.completeExceptionally(err2);
1597            } else {
1598              result.complete(ret);
1599            }
1600
1601          });
1602        }
1603      });
1604    return result;
1605  }
1606
1607  @Override
1608  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1609    CompletableFuture<Void> future = new CompletableFuture<>();
1610    addListener(getRegionLocation(regionName), (location, err) -> {
1611      if (err != null) {
1612        future.completeExceptionally(err);
1613        return;
1614      }
1615      RegionInfo regionInfo = location.getRegion();
1616      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1617        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1618          + "Replicas are auto-split when their primary is split."));
1619        return;
1620      }
1621      ServerName serverName = location.getServerName();
1622      if (serverName == null) {
1623        future
1624          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1625        return;
1626      }
1627      addListener(split(regionInfo, null), (ret, err2) -> {
1628        if (err2 != null) {
1629          future.completeExceptionally(err2);
1630        } else {
1631          future.complete(ret);
1632        }
1633      });
1634    });
1635    return future;
1636  }
1637
1638  @Override
1639  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1640    Preconditions.checkNotNull(splitPoint,
1641      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1642    CompletableFuture<Void> future = new CompletableFuture<>();
1643    addListener(getRegionLocation(regionName), (location, err) -> {
1644      if (err != null) {
1645        future.completeExceptionally(err);
1646        return;
1647      }
1648      RegionInfo regionInfo = location.getRegion();
1649      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1650        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1651          + "Replicas are auto-split when their primary is split."));
1652        return;
1653      }
1654      ServerName serverName = location.getServerName();
1655      if (serverName == null) {
1656        future
1657          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1658        return;
1659      }
1660      if (
1661        regionInfo.getStartKey() != null
1662          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1663      ) {
1664        future.completeExceptionally(
1665          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1666        return;
1667      }
1668      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1669        if (err2 != null) {
1670          future.completeExceptionally(err2);
1671        } else {
1672          future.complete(ret);
1673        }
1674      });
1675    });
1676    return future;
1677  }
1678
1679  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1680    CompletableFuture<Void> future = new CompletableFuture<>();
1681    TableName tableName = hri.getTable();
1682    final SplitTableRegionRequest request;
1683    try {
1684      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1685        ng.newNonce());
1686    } catch (DeserializationException e) {
1687      future.completeExceptionally(e);
1688      return future;
1689    }
1690
1691    addListener(
1692      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1693        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1694      (ret, err2) -> {
1695        if (err2 != null) {
1696          future.completeExceptionally(err2);
1697        } else {
1698          future.complete(ret);
1699        }
1700      });
1701    return future;
1702  }
1703
1704  @Override
1705  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1706    CompletableFuture<Void> future = new CompletableFuture<>();
1707    addListener(getRegionLocation(regionName), (location, err) -> {
1708      if (err != null) {
1709        future.completeExceptionally(err);
1710        return;
1711      }
1712      RegionInfo regionInfo = location.getRegion();
1713      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1714        future.completeExceptionally(new IllegalArgumentException(
1715          "Can't truncate replicas directly.Replicas are auto-truncated "
1716            + "when their primary is truncated."));
1717        return;
1718      }
1719      ServerName serverName = location.getServerName();
1720      if (serverName == null) {
1721        future
1722          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1723        return;
1724      }
1725      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1726        if (err2 != null) {
1727          future.completeExceptionally(err2);
1728        } else {
1729          future.complete(ret);
1730        }
1731      });
1732    });
1733    return future;
1734  }
1735
1736  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1737    CompletableFuture<Void> future = new CompletableFuture<>();
1738    TableName tableName = hri.getTable();
1739    final MasterProtos.TruncateRegionRequest request;
1740    try {
1741      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1742    } catch (DeserializationException e) {
1743      future.completeExceptionally(e);
1744      return future;
1745    }
1746    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1747      MasterProtos.TruncateRegionResponse::getProcId,
1748      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1749        if (err2 != null) {
1750          future.completeExceptionally(err2);
1751        } else {
1752          future.complete(ret);
1753        }
1754      });
1755    return future;
1756  }
1757
1758  @Override
1759  public CompletableFuture<Void> assign(byte[] regionName) {
1760    CompletableFuture<Void> future = new CompletableFuture<>();
1761    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1762      if (err != null) {
1763        future.completeExceptionally(err);
1764        return;
1765      }
1766      addListener(
1767        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1768          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1769            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1770            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1771          .call(),
1772        (ret, err2) -> {
1773          if (err2 != null) {
1774            future.completeExceptionally(err2);
1775          } else {
1776            future.complete(ret);
1777          }
1778        });
1779    });
1780    return future;
1781  }
1782
1783  @Override
1784  public CompletableFuture<Void> unassign(byte[] regionName) {
1785    CompletableFuture<Void> future = new CompletableFuture<>();
1786    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1787      if (err != null) {
1788        future.completeExceptionally(err);
1789        return;
1790      }
1791      addListener(
1792        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1793          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1794            Void> call(controller, stub,
1795              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1796              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1797          .call(),
1798        (ret, err2) -> {
1799          if (err2 != null) {
1800            future.completeExceptionally(err2);
1801          } else {
1802            future.complete(ret);
1803          }
1804        });
1805    });
1806    return future;
1807  }
1808
1809  @Override
1810  public CompletableFuture<Void> offline(byte[] regionName) {
1811    CompletableFuture<Void> future = new CompletableFuture<>();
1812    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1813      if (err != null) {
1814        future.completeExceptionally(err);
1815        return;
1816      }
1817      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1818        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1819          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1820          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1821        .call(), (ret, err2) -> {
1822          if (err2 != null) {
1823            future.completeExceptionally(err2);
1824          } else {
1825            future.complete(ret);
1826          }
1827        });
1828    });
1829    return future;
1830  }
1831
1832  @Override
1833  public CompletableFuture<Void> move(byte[] regionName) {
1834    CompletableFuture<Void> future = new CompletableFuture<>();
1835    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1836      if (err != null) {
1837        future.completeExceptionally(err);
1838        return;
1839      }
1840      addListener(
1841        moveRegion(regionInfo,
1842          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1843        (ret, err2) -> {
1844          if (err2 != null) {
1845            future.completeExceptionally(err2);
1846          } else {
1847            future.complete(ret);
1848          }
1849        });
1850    });
1851    return future;
1852  }
1853
1854  @Override
1855  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1856    Preconditions.checkNotNull(destServerName,
1857      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1858    CompletableFuture<Void> future = new CompletableFuture<>();
1859    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1860      if (err != null) {
1861        future.completeExceptionally(err);
1862        return;
1863      }
1864      addListener(
1865        moveRegion(regionInfo, RequestConverter
1866          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1867        (ret, err2) -> {
1868          if (err2 != null) {
1869            future.completeExceptionally(err2);
1870          } else {
1871            future.complete(ret);
1872          }
1873        });
1874    });
1875    return future;
1876  }
1877
1878  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1879    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1880      .action(
1881        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1882          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1883      .call();
1884  }
1885
1886  @Override
1887  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1888    return this.<Void> newMasterCaller()
1889      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1890        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1891        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1892      .call();
1893  }
1894
1895  @Override
1896  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1897    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1898    Scan scan = QuotaTableUtil.makeScan(filter);
1899    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1900      new AdvancedScanResultConsumer() {
1901        List<QuotaSettings> settings = new ArrayList<>();
1902
1903        @Override
1904        public void onNext(Result[] results, ScanController controller) {
1905          for (Result result : results) {
1906            try {
1907              QuotaTableUtil.parseResultToCollection(result, settings);
1908            } catch (IOException e) {
1909              controller.terminate();
1910              future.completeExceptionally(e);
1911            }
1912          }
1913        }
1914
1915        @Override
1916        public void onError(Throwable error) {
1917          future.completeExceptionally(error);
1918        }
1919
1920        @Override
1921        public void onComplete() {
1922          future.complete(settings);
1923        }
1924      });
1925    return future;
1926  }
1927
1928  @Override
1929  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1930    boolean enabled) {
1931    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1932      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1933      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1934      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1935  }
1936
1937  @Override
1938  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1939    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1940      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1941      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1942      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1943  }
1944
1945  @Override
1946  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1947    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1948      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1949      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1950      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1951  }
1952
1953  @Override
1954  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1955    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1956      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1957      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1958      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1959  }
1960
1961  @Override
1962  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1963    return this.<ReplicationPeerConfig> newMasterCaller()
1964      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1965        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1966          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1967          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1968          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1969      .call();
1970  }
1971
1972  @Override
1973  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1974    ReplicationPeerConfig peerConfig) {
1975    return this.<UpdateReplicationPeerConfigRequest,
1976      UpdateReplicationPeerConfigResponse> procedureCall(
1977        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1978        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1979        (resp) -> resp.getProcId(),
1980        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1981  }
1982
1983  @Override
1984  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1985    SyncReplicationState clusterState) {
1986    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1987      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1988        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1989          clusterState),
1990        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1991        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1992          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1993  }
1994
1995  @Override
1996  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1997    Map<TableName, List<String>> tableCfs) {
1998    if (tableCfs == null) {
1999      return failedFuture(new ReplicationException("tableCfs is null"));
2000    }
2001
2002    CompletableFuture<Void> future = new CompletableFuture<>();
2003    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
2004      if (!completeExceptionally(future, error)) {
2005        ReplicationPeerConfig newPeerConfig =
2006          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
2007        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
2008          if (!completeExceptionally(future, error)) {
2009            future.complete(result);
2010          }
2011        });
2012      }
2013    });
2014    return future;
2015  }
2016
2017  @Override
2018  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
2019    Map<TableName, List<String>> tableCfs) {
2020    if (tableCfs == null) {
2021      return failedFuture(new ReplicationException("tableCfs is null"));
2022    }
2023
2024    CompletableFuture<Void> future = new CompletableFuture<>();
2025    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
2026      if (!completeExceptionally(future, error)) {
2027        ReplicationPeerConfig newPeerConfig = null;
2028        try {
2029          newPeerConfig = ReplicationPeerConfigUtil
2030            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
2031        } catch (ReplicationException e) {
2032          future.completeExceptionally(e);
2033          return;
2034        }
2035        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
2036          if (!completeExceptionally(future, error)) {
2037            future.complete(result);
2038          }
2039        });
2040      }
2041    });
2042    return future;
2043  }
2044
2045  @Override
2046  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
2047    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
2048  }
2049
2050  @Override
2051  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
2052    Preconditions.checkNotNull(pattern,
2053      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
2054    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
2055  }
2056
2057  private CompletableFuture<List<ReplicationPeerDescription>>
2058    listReplicationPeers(ListReplicationPeersRequest request) {
2059    return this.<List<ReplicationPeerDescription>> newMasterCaller()
2060      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
2061        List<ReplicationPeerDescription>> call(controller, stub, request,
2062          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
2063          (resp) -> resp.getPeerDescList().stream()
2064            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
2065            .collect(Collectors.toList())))
2066      .call();
2067  }
2068
2069  @Override
2070  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2071    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2072    addListener(listTableDescriptors(), (tables, error) -> {
2073      if (!completeExceptionally(future, error)) {
2074        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2075        tables.forEach(table -> {
2076          Map<String, Integer> cfs = new HashMap<>();
2077          Stream.of(table.getColumnFamilies())
2078            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2079            .forEach(column -> {
2080              cfs.put(column.getNameAsString(), column.getScope());
2081            });
2082          if (!cfs.isEmpty()) {
2083            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2084          }
2085        });
2086        future.complete(replicatedTableCFs);
2087      }
2088    });
2089    return future;
2090  }
2091
2092  @Override
2093  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2094    SnapshotProtos.SnapshotDescription snapshot =
2095      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2096    try {
2097      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2098    } catch (IllegalArgumentException e) {
2099      return failedFuture(e);
2100    }
2101    CompletableFuture<Void> future = new CompletableFuture<>();
2102    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2103      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2104    addListener(this.<SnapshotResponse> newMasterCaller()
2105      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2106        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2107      .call(), (resp, err) -> {
2108        if (err != null) {
2109          future.completeExceptionally(err);
2110          return;
2111        }
2112        waitSnapshotFinish(snapshotDesc, future, resp);
2113      });
2114    return future;
2115  }
2116
2117  // This is for keeping compatibility with old implementation.
2118  // If there is a procId field in the response, then the snapshot will be operated with a
2119  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2120  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2121    SnapshotResponse resp) {
2122    if (resp.hasProcId()) {
2123      getProcedureResult(resp.getProcId(), src -> null, future, 0);
2124      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2125    } else {
2126      long expectedTimeout = resp.getExpectedTimeout();
2127      TimerTask pollingTask = new TimerTask() {
2128        int tries = 0;
2129        long startTime = EnvironmentEdgeManager.currentTime();
2130        long endTime = startTime + expectedTimeout;
2131        long maxPauseTime = expectedTimeout / maxAttempts;
2132
2133        @Override
2134        public void run(Timeout timeout) throws Exception {
2135          if (EnvironmentEdgeManager.currentTime() < endTime) {
2136            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2137              if (err2 != null) {
2138                future.completeExceptionally(err2);
2139              } else if (done) {
2140                future.complete(null);
2141              } else {
2142                // retry again after pauseTime.
2143                long pauseTime =
2144                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2145                pauseTime = Math.min(pauseTime, maxPauseTime);
2146                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2147              }
2148            });
2149          } else {
2150            future
2151              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2152                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2153          }
2154        }
2155      };
2156      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2157    }
2158  }
2159
2160  @Override
2161  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2162    return this.<Boolean> newMasterCaller()
2163      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2164        Boolean> call(controller, stub,
2165          IsSnapshotDoneRequest.newBuilder()
2166            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2167          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2168      .call();
2169  }
2170
2171  @Override
2172  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2173    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2174      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2175      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2176    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2177  }
2178
2179  @Override
2180  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2181    boolean restoreAcl) {
2182    CompletableFuture<Void> future = new CompletableFuture<>();
2183    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2184      if (err != null) {
2185        future.completeExceptionally(err);
2186        return;
2187      }
2188      TableName tableName = null;
2189      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2190        for (SnapshotDescription snap : snapshotDescriptions) {
2191          if (snap.getName().equals(snapshotName)) {
2192            tableName = snap.getTableName();
2193            break;
2194          }
2195        }
2196      }
2197      if (tableName == null) {
2198        future.completeExceptionally(
2199          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2200        return;
2201      }
2202      final TableName finalTableName = tableName;
2203      addListener(tableExists(finalTableName), (exists, err2) -> {
2204        if (err2 != null) {
2205          future.completeExceptionally(err2);
2206        } else if (!exists) {
2207          // if table does not exist, then just clone snapshot into new table.
2208          completeConditionalOnFuture(future,
2209            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2210        } else {
2211          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2212            if (err4 != null) {
2213              future.completeExceptionally(err4);
2214            } else if (!disabled) {
2215              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2216            } else {
2217              completeConditionalOnFuture(future,
2218                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2219            }
2220          });
2221        }
2222      });
2223    });
2224    return future;
2225  }
2226
2227  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2228    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2229    if (takeFailSafeSnapshot) {
2230      CompletableFuture<Void> future = new CompletableFuture<>();
2231      // Step.1 Take a snapshot of the current state
2232      String failSafeSnapshotSnapshotNameFormat =
2233        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2234          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2235      final String failSafeSnapshotSnapshotName =
2236        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2237          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2238          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2239      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2240      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2241        if (err != null) {
2242          future.completeExceptionally(err);
2243        } else {
2244          // Step.2 Restore snapshot
2245          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2246            (void2, err2) -> {
2247              if (err2 != null) {
2248                // Step.3.a Something went wrong during the restore and try to rollback.
2249                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2250                  restoreAcl, null), (void3, err3) -> {
2251                    if (err3 != null) {
2252                      future.completeExceptionally(err3);
2253                    } else {
2254                      // If fail to restore snapshot but rollback successfully, delete the
2255                      // restore-failsafe snapshot.
2256                      LOG.info(
2257                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2258                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2259                        if (err4 != null) {
2260                          LOG.error("Unable to remove the failsafe snapshot: {}",
2261                            failSafeSnapshotSnapshotName, err4);
2262                        }
2263                        String msg =
2264                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2265                            + failSafeSnapshotSnapshotName + " succeeded.";
2266                        LOG.error(msg);
2267                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2268                      });
2269                    }
2270                  });
2271              } else {
2272                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2273                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2274                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2275                  if (err3 != null) {
2276                    LOG.error(
2277                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2278                      err3);
2279                  }
2280                  future.complete(ret3);
2281                });
2282              }
2283            });
2284        }
2285      });
2286      return future;
2287    } else {
2288      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2289    }
2290  }
2291
2292  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2293    CompletableFuture<T> parentFuture) {
2294    addListener(parentFuture, (res, err) -> {
2295      if (err != null) {
2296        dependentFuture.completeExceptionally(err);
2297      } else {
2298        dependentFuture.complete(res);
2299      }
2300    });
2301  }
2302
2303  @Override
2304  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2305    boolean restoreAcl, String customSFT) {
2306    CompletableFuture<Void> future = new CompletableFuture<>();
2307    addListener(tableExists(tableName), (exists, err) -> {
2308      if (err != null) {
2309        future.completeExceptionally(err);
2310      } else if (exists) {
2311        future.completeExceptionally(new TableExistsException(tableName));
2312      } else {
2313        completeConditionalOnFuture(future,
2314          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2315      }
2316    });
2317    return future;
2318  }
2319
2320  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2321    boolean restoreAcl, String customSFT) {
2322    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2323      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2324    try {
2325      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2326    } catch (IllegalArgumentException e) {
2327      return failedFuture(e);
2328    }
2329    RestoreSnapshotRequest.Builder builder =
2330      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2331        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2332    if (customSFT != null) {
2333      builder.setCustomSFT(customSFT);
2334    }
2335    return waitProcedureResult(this.<Long> newMasterCaller()
2336      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2337        Long> call(controller, stub, builder.build(),
2338          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2339      .call(), result -> null);
2340  }
2341
2342  @Override
2343  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2344    return getCompletedSnapshots(null);
2345  }
2346
2347  @Override
2348  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2349    Preconditions.checkNotNull(pattern,
2350      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2351    return getCompletedSnapshots(pattern);
2352  }
2353
2354  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2355    return this.<List<SnapshotDescription>> newMasterCaller()
2356      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2357        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2358          GetCompletedSnapshotsRequest.newBuilder().build(),
2359          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2360          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2361      .call();
2362  }
2363
2364  @Override
2365  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2366    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2367      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2368    return getCompletedSnapshots(tableNamePattern, null);
2369  }
2370
2371  @Override
2372  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2373    Pattern snapshotNamePattern) {
2374    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2375      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2376    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2377      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2378    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2379  }
2380
2381  private CompletableFuture<List<SnapshotDescription>>
2382    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2383    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2384    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2385      if (err != null) {
2386        future.completeExceptionally(err);
2387        return;
2388      }
2389      if (tableNames == null || tableNames.size() <= 0) {
2390        future.complete(Collections.emptyList());
2391        return;
2392      }
2393      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2394        if (err2 != null) {
2395          future.completeExceptionally(err2);
2396          return;
2397        }
2398        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2399          future.complete(Collections.emptyList());
2400          return;
2401        }
2402        future.complete(snapshotDescList.stream()
2403          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2404          .collect(Collectors.toList()));
2405      });
2406    });
2407    return future;
2408  }
2409
2410  @Override
2411  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2412    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2413  }
2414
2415  @Override
2416  public CompletableFuture<Void> deleteSnapshots() {
2417    return internalDeleteSnapshots(null, null);
2418  }
2419
2420  @Override
2421  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2422    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2423      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2424    return internalDeleteSnapshots(null, snapshotNamePattern);
2425  }
2426
2427  @Override
2428  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2429    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2430      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2431    return internalDeleteSnapshots(tableNamePattern, null);
2432  }
2433
2434  @Override
2435  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2436    Pattern snapshotNamePattern) {
2437    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2438      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2439    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2440      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2441    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2442  }
2443
2444  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2445    Pattern snapshotNamePattern) {
2446    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2447    if (tableNamePattern == null) {
2448      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2449    } else {
2450      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2451    }
2452    CompletableFuture<Void> future = new CompletableFuture<>();
2453    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2454      if (err != null) {
2455        future.completeExceptionally(err);
2456        return;
2457      }
2458      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2459        future.complete(null);
2460        return;
2461      }
2462      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2463        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2464          if (e != null) {
2465            future.completeExceptionally(e);
2466          } else {
2467            future.complete(v);
2468          }
2469        });
2470    });
2471    return future;
2472  }
2473
2474  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2475    return this.<Void> newMasterCaller()
2476      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2477        controller, stub,
2478        DeleteSnapshotRequest.newBuilder()
2479          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2480        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2481      .call();
2482  }
2483
2484  @Override
2485  public CompletableFuture<Void> execProcedure(String signature, String instance,
2486    Map<String, String> props) {
2487    CompletableFuture<Void> future = new CompletableFuture<>();
2488    ProcedureDescription procDesc =
2489      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2490    addListener(this.<Long> newMasterCaller()
2491      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2492        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2493        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2494      .call(), (expectedTimeout, err) -> {
2495        if (err != null) {
2496          future.completeExceptionally(err);
2497          return;
2498        }
2499        TimerTask pollingTask = new TimerTask() {
2500          int tries = 0;
2501          long startTime = EnvironmentEdgeManager.currentTime();
2502          long endTime = startTime + expectedTimeout;
2503          long maxPauseTime = expectedTimeout / maxAttempts;
2504
2505          @Override
2506          public void run(Timeout timeout) throws Exception {
2507            if (EnvironmentEdgeManager.currentTime() < endTime) {
2508              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2509                if (err2 != null) {
2510                  future.completeExceptionally(err2);
2511                  return;
2512                }
2513                if (done) {
2514                  future.complete(null);
2515                } else {
2516                  // retry again after pauseTime.
2517                  long pauseTime =
2518                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2519                  pauseTime = Math.min(pauseTime, maxPauseTime);
2520                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2521                    TimeUnit.MICROSECONDS);
2522                }
2523              });
2524            } else {
2525              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2526                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2527            }
2528          }
2529        };
2530        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2531        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2532      });
2533    return future;
2534  }
2535
2536  @Override
2537  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2538    Map<String, String> props) {
2539    ProcedureDescription proDesc =
2540      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2541    return this.<byte[]> newMasterCaller()
2542      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2543        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2544        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2545        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2546      .call();
2547  }
2548
2549  @Override
2550  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2551    Map<String, String> props) {
2552    ProcedureDescription proDesc =
2553      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2554    return this.<Boolean> newMasterCaller()
2555      .action(
2556        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2557          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2558          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2559      .call();
2560  }
2561
2562  @Override
2563  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2564    return this.<Boolean> newMasterCaller().action(
2565      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2566        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2567        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2568      .call();
2569  }
2570
2571  @Override
2572  public CompletableFuture<String> getProcedures() {
2573    return this.<String> newMasterCaller()
2574      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2575        controller, stub, GetProceduresRequest.newBuilder().build(),
2576        (s, c, req, done) -> s.getProcedures(c, req, done),
2577        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2578      .call();
2579  }
2580
2581  @Override
2582  public CompletableFuture<String> getLocks() {
2583    return this.<String> newMasterCaller()
2584      .action(
2585        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2586          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2587          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2588      .call();
2589  }
2590
2591  @Override
2592  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2593    boolean offload) {
2594    return this.<Void> newMasterCaller()
2595      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2596        DecommissionRegionServersResponse, Void> call(controller, stub,
2597          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2598          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2599      .call();
2600  }
2601
2602  @Override
2603  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2604    return this.<List<ServerName>> newMasterCaller()
2605      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2606        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2607          ListDecommissionedRegionServersRequest.newBuilder().build(),
2608          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2609          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2610            .collect(Collectors.toList())))
2611      .call();
2612  }
2613
2614  @Override
2615  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2616    List<byte[]> encodedRegionNames) {
2617    return this.<Void> newMasterCaller()
2618      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2619        RecommissionRegionServerResponse, Void> call(controller, stub,
2620          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2621          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2622      .call();
2623  }
2624
2625  /**
2626   * Get the region location for the passed region name. The region name may be a full region name
2627   * or encoded region name. If the region does not found, then it'll throw an
2628   * UnknownRegionException wrapped by a {@link CompletableFuture}
2629   * @param regionNameOrEncodedRegionName region name or encoded region name
2630   * @return region location, wrapped by a {@link CompletableFuture}
2631   */
2632  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2633    if (regionNameOrEncodedRegionName == null) {
2634      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2635    }
2636
2637    CompletableFuture<Optional<HRegionLocation>> future;
2638    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2639      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2640      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2641        // old format encodedName, should be meta region
2642        future = connection.registry.getMetaRegionLocations()
2643          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2644            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2645      } else {
2646        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2647          regionNameOrEncodedRegionName);
2648      }
2649    } else {
2650      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2651      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2652      RegionInfo regionInfo;
2653      try {
2654        regionInfo =
2655          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2656      } catch (IOException ioe) {
2657        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2658      }
2659
2660      if (regionInfo.isMetaRegion()) {
2661        future = connection.registry.getMetaRegionLocations()
2662          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2663            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2664            .findFirst());
2665      } else {
2666        future =
2667          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2668      }
2669    }
2670
2671    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2672    addListener(future, (location, err) -> {
2673      if (err != null) {
2674        returnedFuture.completeExceptionally(err);
2675        return;
2676      }
2677      if (!location.isPresent() || location.get().getRegion() == null) {
2678        returnedFuture.completeExceptionally(
2679          new UnknownRegionException("Invalid region name or encoded region name: "
2680            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2681      } else {
2682        returnedFuture.complete(location.get());
2683      }
2684    });
2685    return returnedFuture;
2686  }
2687
2688  /**
2689   * Get the region info for the passed region name. The region name may be a full region name or
2690   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2691   * wrapped by a {@link CompletableFuture}
2692   * @return region info, wrapped by a {@link CompletableFuture}
2693   */
2694  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2695    if (regionNameOrEncodedRegionName == null) {
2696      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2697    }
2698
2699    if (
2700      Bytes.equals(regionNameOrEncodedRegionName,
2701        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2702        || Bytes.equals(regionNameOrEncodedRegionName,
2703          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2704    ) {
2705      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2706    }
2707
2708    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2709    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2710      if (err != null) {
2711        future.completeExceptionally(err);
2712      } else {
2713        future.complete(location.getRegion());
2714      }
2715    });
2716    return future;
2717  }
2718
2719  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2720    if (numRegions < 3) {
2721      throw new IllegalArgumentException("Must create at least three regions");
2722    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2723      throw new IllegalArgumentException("Start key must be smaller than end key");
2724    }
2725    if (numRegions == 3) {
2726      return new byte[][] { startKey, endKey };
2727    }
2728    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2729    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2730      throw new IllegalArgumentException("Unable to split key range into enough regions");
2731    }
2732    return splitKeys;
2733  }
2734
2735  private void verifySplitKeys(byte[][] splitKeys) {
2736    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2737    // Verify there are no duplicate split keys
2738    byte[] lastKey = null;
2739    for (byte[] splitKey : splitKeys) {
2740      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2741        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2742      }
2743      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2744        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2745          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2746      }
2747      lastKey = splitKey;
2748    }
2749  }
2750
2751  private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> {
2752
2753    abstract void onFinished();
2754
2755    abstract void onError(Throwable error);
2756
2757    @Override
2758    public void accept(T value, Throwable error) {
2759      if (error != null) {
2760        onError(error);
2761        return;
2762      }
2763      onFinished();
2764    }
2765  }
2766
2767  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2768    protected final TableName tableName;
2769
2770    TableProcedureBiConsumer(TableName tableName) {
2771      this.tableName = tableName;
2772    }
2773
2774    abstract String getOperationType();
2775
2776    String getDescription() {
2777      return "Operation: " + getOperationType() + ", " + "Table Name: "
2778        + tableName.getNameWithNamespaceInclAsString();
2779    }
2780
2781    @Override
2782    void onFinished() {
2783      LOG.info(getDescription() + " completed");
2784    }
2785
2786    @Override
2787    void onError(Throwable error) {
2788      LOG.info(getDescription() + " failed with " + error.getMessage());
2789    }
2790  }
2791
2792  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2793    protected final String namespaceName;
2794
2795    NamespaceProcedureBiConsumer(String namespaceName) {
2796      this.namespaceName = namespaceName;
2797    }
2798
2799    abstract String getOperationType();
2800
2801    String getDescription() {
2802      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2803    }
2804
2805    @Override
2806    void onFinished() {
2807      LOG.info("{} completed", getDescription());
2808    }
2809
2810    @Override
2811    void onError(Throwable error) {
2812      LOG.info("{} failed with {}", getDescription(), error.getMessage());
2813    }
2814  }
2815
2816  private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer {
2817
2818    @Override
2819    void onFinished() {
2820      LOG.info("RestoreBackupSystemTableProcedure completed");
2821    }
2822
2823    @Override
2824    void onError(Throwable error) {
2825      LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage());
2826    }
2827  }
2828
2829  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2830
2831    CreateTableProcedureBiConsumer(TableName tableName) {
2832      super(tableName);
2833    }
2834
2835    @Override
2836    String getOperationType() {
2837      return "CREATE";
2838    }
2839  }
2840
2841  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2842
2843    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2844      super(tableName);
2845    }
2846
2847    @Override
2848    String getOperationType() {
2849      return "MODIFY";
2850    }
2851  }
2852
2853  private static class ReopenTableRegionsProcedureBiConsumer extends TableProcedureBiConsumer {
2854
2855    ReopenTableRegionsProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2856      super(tableName);
2857    }
2858
2859    @Override
2860    String getOperationType() {
2861      return "REOPEN_TABLE_REGIONS";
2862    }
2863  }
2864
2865  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2866    extends TableProcedureBiConsumer {
2867
2868    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2869      super(tableName);
2870    }
2871
2872    @Override
2873    String getOperationType() {
2874      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2875    }
2876  }
2877
2878  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2879
2880    DeleteTableProcedureBiConsumer(TableName tableName) {
2881      super(tableName);
2882    }
2883
2884    @Override
2885    String getOperationType() {
2886      return "DELETE";
2887    }
2888
2889    @Override
2890    void onFinished() {
2891      connection.getLocator().clearCache(this.tableName);
2892      super.onFinished();
2893    }
2894  }
2895
2896  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2897
2898    TruncateTableProcedureBiConsumer(TableName tableName) {
2899      super(tableName);
2900    }
2901
2902    @Override
2903    String getOperationType() {
2904      return "TRUNCATE";
2905    }
2906  }
2907
2908  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2909
2910    EnableTableProcedureBiConsumer(TableName tableName) {
2911      super(tableName);
2912    }
2913
2914    @Override
2915    String getOperationType() {
2916      return "ENABLE";
2917    }
2918  }
2919
2920  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2921
2922    DisableTableProcedureBiConsumer(TableName tableName) {
2923      super(tableName);
2924    }
2925
2926    @Override
2927    String getOperationType() {
2928      return "DISABLE";
2929    }
2930  }
2931
2932  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2933
2934    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2935      super(tableName);
2936    }
2937
2938    @Override
2939    String getOperationType() {
2940      return "ADD_COLUMN_FAMILY";
2941    }
2942  }
2943
2944  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2945
2946    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2947      super(tableName);
2948    }
2949
2950    @Override
2951    String getOperationType() {
2952      return "DELETE_COLUMN_FAMILY";
2953    }
2954  }
2955
2956  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2957
2958    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2959      super(tableName);
2960    }
2961
2962    @Override
2963    String getOperationType() {
2964      return "MODIFY_COLUMN_FAMILY";
2965    }
2966  }
2967
2968  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2969    extends TableProcedureBiConsumer {
2970
2971    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2972      super(tableName);
2973    }
2974
2975    @Override
2976    String getOperationType() {
2977      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2978    }
2979  }
2980
2981  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2982
2983    FlushTableProcedureBiConsumer(TableName tableName) {
2984      super(tableName);
2985    }
2986
2987    @Override
2988    String getOperationType() {
2989      return "FLUSH";
2990    }
2991  }
2992
2993  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2994
2995    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2996      super(namespaceName);
2997    }
2998
2999    @Override
3000    String getOperationType() {
3001      return "CREATE_NAMESPACE";
3002    }
3003  }
3004
3005  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
3006
3007    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
3008      super(namespaceName);
3009    }
3010
3011    @Override
3012    String getOperationType() {
3013      return "DELETE_NAMESPACE";
3014    }
3015  }
3016
3017  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
3018
3019    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
3020      super(namespaceName);
3021    }
3022
3023    @Override
3024    String getOperationType() {
3025      return "MODIFY_NAMESPACE";
3026    }
3027  }
3028
3029  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3030
3031    MergeTableRegionProcedureBiConsumer(TableName tableName) {
3032      super(tableName);
3033    }
3034
3035    @Override
3036    String getOperationType() {
3037      return "MERGE_REGIONS";
3038    }
3039  }
3040
3041  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3042
3043    SplitTableRegionProcedureBiConsumer(TableName tableName) {
3044      super(tableName);
3045    }
3046
3047    @Override
3048    String getOperationType() {
3049      return "SPLIT_REGION";
3050    }
3051  }
3052
3053  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3054
3055    TruncateRegionProcedureBiConsumer(TableName tableName) {
3056      super(tableName);
3057    }
3058
3059    @Override
3060    String getOperationType() {
3061      return "TRUNCATE_REGION";
3062    }
3063  }
3064
3065  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
3066    SnapshotProcedureBiConsumer(TableName tableName) {
3067      super(tableName);
3068    }
3069
3070    @Override
3071    String getOperationType() {
3072      return "SNAPSHOT";
3073    }
3074  }
3075
3076  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> {
3077    private final String peerId;
3078    private final Supplier<String> getOperation;
3079
3080    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
3081      this.peerId = peerId;
3082      this.getOperation = getOperation;
3083    }
3084
3085    String getDescription() {
3086      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3087    }
3088
3089    @Override
3090    void onFinished() {
3091      LOG.info("{} completed", getDescription());
3092    }
3093
3094    @Override
3095    void onError(Throwable error) {
3096      LOG.info("{} failed with {}", getDescription(), error.getMessage());
3097    }
3098  }
3099
3100  private static final class RollAllWALWritersBiConsumer
3101    extends ProcedureBiConsumer<Map<ServerName, Long>> {
3102
3103    @Override
3104    void onFinished() {
3105      LOG.info("Rolling all WAL writers completed");
3106    }
3107
3108    @Override
3109    void onError(Throwable error) {
3110      LOG.warn("Rolling all WAL writers failed with {}", error.getMessage());
3111    }
3112  }
3113
3114  private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture,
3115    Converter<T, ByteString> converter) {
3116    CompletableFuture<T> future = new CompletableFuture<>();
3117    addListener(procFuture, (procId, error) -> {
3118      if (error != null) {
3119        future.completeExceptionally(error);
3120        return;
3121      }
3122      getProcedureResult(procId, converter, future, 0);
3123    });
3124    return future;
3125  }
3126
3127  private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter,
3128    CompletableFuture<T> future, int retries) {
3129    addListener(
3130      this.<GetProcedureResultResponse> newMasterCaller()
3131        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3132          GetProcedureResultResponse> call(controller, stub,
3133            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3134            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3135        .call(),
3136      (response, error) -> {
3137        if (error != null) {
3138          Throwable exc = ConnectionUtils.translateException(error);
3139          if (exc instanceof DoNotRetryIOException) {
3140            // stop retrying on DNRIOE
3141            future.completeExceptionally(exc);
3142          } else {
3143            LOG.warn("failed to get the procedure result procId={}", procId, exc);
3144            retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3145              ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3146          }
3147          return;
3148        }
3149        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3150          retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3151            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3152          return;
3153        }
3154        if (response.hasException()) {
3155          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3156          future.completeExceptionally(ioe);
3157        } else {
3158          try {
3159            future.complete(converter.convert(response.getResult()));
3160          } catch (IOException e) {
3161            future.completeExceptionally(e);
3162          }
3163        }
3164      });
3165  }
3166
3167  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3168    CompletableFuture<T> future = new CompletableFuture<>();
3169    future.completeExceptionally(error);
3170    return future;
3171  }
3172
3173  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3174    if (error != null) {
3175      future.completeExceptionally(error);
3176      return true;
3177    }
3178    return false;
3179  }
3180
3181  @Override
3182  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3183    return getClusterMetrics(EnumSet.allOf(Option.class));
3184  }
3185
3186  @Override
3187  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3188    return this.<ClusterMetrics> newMasterCaller()
3189      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3190        ClusterMetrics> call(controller, stub,
3191          RequestConverter.buildGetClusterStatusRequest(options),
3192          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3193          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3194      .call();
3195  }
3196
3197  @Override
3198  public CompletableFuture<Void> shutdown() {
3199    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3200      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3201        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3202        resp -> null))
3203      .call();
3204  }
3205
3206  @Override
3207  public CompletableFuture<Void> stopMaster() {
3208    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3209      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3210        controller, stub, StopMasterRequest.newBuilder().build(),
3211        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3212      .call();
3213  }
3214
3215  @Override
3216  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3217    StopServerRequest request = RequestConverter
3218      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3219    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3220      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3221        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3222        resp -> null))
3223      .serverName(serverName).call();
3224  }
3225
3226  @Override
3227  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3228    return this.<Void> newAdminCaller()
3229      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3230        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3231          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3232      .serverName(serverName).call();
3233  }
3234
3235  @Override
3236  public CompletableFuture<Void> updateConfiguration() {
3237    CompletableFuture<Void> future = new CompletableFuture<Void>();
3238    addListener(
3239      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3240      (status, err) -> {
3241        if (err != null) {
3242          future.completeExceptionally(err);
3243        } else {
3244          List<CompletableFuture<Void>> futures = new ArrayList<>();
3245          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3246          futures.add(updateConfiguration(status.getMasterName()));
3247          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3248          addListener(
3249            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3250            (result, err2) -> {
3251              if (err2 != null) {
3252                future.completeExceptionally(err2);
3253              } else {
3254                future.complete(result);
3255              }
3256            });
3257        }
3258      });
3259    return future;
3260  }
3261
3262  @Override
3263  public CompletableFuture<Void> updateConfiguration(String groupName) {
3264    CompletableFuture<Void> future = new CompletableFuture<Void>();
3265    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3266      if (err != null) {
3267        future.completeExceptionally(err);
3268      } else if (rsGroupInfo == null) {
3269        future.completeExceptionally(
3270          new IllegalArgumentException("Group does not exist: " + groupName));
3271      } else {
3272        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3273          if (err2 != null) {
3274            future.completeExceptionally(err2);
3275          } else {
3276            List<CompletableFuture<Void>> futures = new ArrayList<>();
3277            List<ServerName> groupServers = status.getServersName().stream()
3278              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3279            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3280            addListener(
3281              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3282              (result, err3) -> {
3283                if (err3 != null) {
3284                  future.completeExceptionally(err3);
3285                } else {
3286                  future.complete(result);
3287                }
3288              });
3289          }
3290        });
3291      }
3292    });
3293    return future;
3294  }
3295
3296  @Override
3297  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3298    return this.<Void> newAdminCaller()
3299      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3300        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3301          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3302      .serverName(serverName).call();
3303  }
3304
3305  @Override
3306  public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
3307    return this
3308      .<RollAllWALWritersRequest, RollAllWALWritersResponse,
3309        Map<ServerName,
3310          Long>> procedureCall(
3311            RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()),
3312            (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(),
3313            result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap()
3314              .entrySet().stream().collect(Collectors
3315                .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)),
3316            new RollAllWALWritersBiConsumer());
3317  }
3318
3319  @Override
3320  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3321    return this.<Void> newAdminCaller()
3322      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3323        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3324          RequestConverter.buildClearCompactionQueuesRequest(queues),
3325          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3326      .serverName(serverName).call();
3327  }
3328
3329  @Override
3330  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3331    return this.<List<SecurityCapability>> newMasterCaller()
3332      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3333        List<SecurityCapability>> call(controller, stub,
3334          SecurityCapabilitiesRequest.newBuilder().build(),
3335          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3336          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3337      .call();
3338  }
3339
3340  @Override
3341  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3342    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3343  }
3344
3345  @Override
3346  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3347    TableName tableName) {
3348    Preconditions.checkNotNull(tableName,
3349      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3350    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3351  }
3352
3353  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3354    ServerName serverName) {
3355    return this.<List<RegionMetrics>> newAdminCaller()
3356      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3357        List<RegionMetrics>> adminCall(controller, stub, request,
3358          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3359          RegionMetricsBuilder::toRegionMetrics))
3360      .serverName(serverName).call();
3361  }
3362
3363  @Override
3364  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3365    return this.<Boolean> newMasterCaller()
3366      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3367        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3368          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3369          resp -> resp.getInMaintenanceMode()))
3370      .call();
3371  }
3372
3373  @Override
3374  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3375    CompactType compactType) {
3376    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3377
3378    switch (compactType) {
3379      case MOB:
3380        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3381          if (err != null) {
3382            future.completeExceptionally(err);
3383            return;
3384          }
3385          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3386
3387          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3388            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3389              GetRegionInfoResponse> adminCall(controller, stub,
3390                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3391                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3392            .call(), (resp2, err2) -> {
3393              if (err2 != null) {
3394                future.completeExceptionally(err2);
3395              } else {
3396                if (resp2.hasCompactionState()) {
3397                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3398                } else {
3399                  future.complete(CompactionState.NONE);
3400                }
3401              }
3402            });
3403        });
3404        break;
3405      case NORMAL:
3406        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3407          if (err != null) {
3408            future.completeExceptionally(err);
3409            return;
3410          }
3411          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3412          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3413          locations.stream().filter(loc -> loc.getServerName() != null)
3414            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3415            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3416              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3417                // If any region compaction state is MAJOR_AND_MINOR
3418                // the table compaction state is MAJOR_AND_MINOR, too.
3419                if (err2 != null) {
3420                  future.completeExceptionally(unwrapCompletionException(err2));
3421                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3422                  future.complete(regionState);
3423                } else {
3424                  regionStates.add(regionState);
3425                }
3426              }));
3427            });
3428          addListener(
3429            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3430            (ret, err3) -> {
3431              // If future not completed, check all regions's compaction state
3432              if (!future.isCompletedExceptionally() && !future.isDone()) {
3433                CompactionState state = CompactionState.NONE;
3434                for (CompactionState regionState : regionStates) {
3435                  switch (regionState) {
3436                    case MAJOR:
3437                      if (state == CompactionState.MINOR) {
3438                        future.complete(CompactionState.MAJOR_AND_MINOR);
3439                      } else {
3440                        state = CompactionState.MAJOR;
3441                      }
3442                      break;
3443                    case MINOR:
3444                      if (state == CompactionState.MAJOR) {
3445                        future.complete(CompactionState.MAJOR_AND_MINOR);
3446                      } else {
3447                        state = CompactionState.MINOR;
3448                      }
3449                      break;
3450                    case NONE:
3451                    default:
3452                  }
3453                }
3454                if (!future.isDone()) {
3455                  future.complete(state);
3456                }
3457              }
3458            });
3459        });
3460        break;
3461      default:
3462        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3463    }
3464
3465    return future;
3466  }
3467
3468  @Override
3469  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3470    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3471    addListener(getRegionLocation(regionName), (location, err) -> {
3472      if (err != null) {
3473        future.completeExceptionally(err);
3474        return;
3475      }
3476      ServerName serverName = location.getServerName();
3477      if (serverName == null) {
3478        future
3479          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3480        return;
3481      }
3482      addListener(
3483        this.<GetRegionInfoResponse> newAdminCaller()
3484          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3485            GetRegionInfoResponse> adminCall(controller, stub,
3486              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3487                true),
3488              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3489          .serverName(serverName).call(),
3490        (resp2, err2) -> {
3491          if (err2 != null) {
3492            future.completeExceptionally(err2);
3493          } else {
3494            if (resp2.hasCompactionState()) {
3495              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3496            } else {
3497              future.complete(CompactionState.NONE);
3498            }
3499          }
3500        });
3501    });
3502    return future;
3503  }
3504
3505  @Override
3506  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3507    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3508      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3509    return this.<Optional<Long>> newMasterCaller()
3510      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3511        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3512          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3513          ProtobufUtil::toOptionalTimestamp))
3514      .call();
3515  }
3516
3517  @Override
3518  public CompletableFuture<Optional<Long>>
3519    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3520    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3521    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3522    addListener(getRegionInfo(regionName), (region, err) -> {
3523      if (err != null) {
3524        future.completeExceptionally(err);
3525        return;
3526      }
3527      MajorCompactionTimestampForRegionRequest.Builder builder =
3528        MajorCompactionTimestampForRegionRequest.newBuilder();
3529      builder.setRegion(
3530        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3531      addListener(this.<Optional<Long>> newMasterCaller()
3532        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3533          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3534            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3535            ProtobufUtil::toOptionalTimestamp))
3536        .call(), (timestamp, err2) -> {
3537          if (err2 != null) {
3538            future.completeExceptionally(err2);
3539          } else {
3540            future.complete(timestamp);
3541          }
3542        });
3543    });
3544    return future;
3545  }
3546
3547  @Override
3548  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3549    List<String> serverNamesList) {
3550    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3551    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3552      if (err != null) {
3553        future.completeExceptionally(err);
3554        return;
3555      }
3556      // Accessed by multiple threads.
3557      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3558      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3559      serverNames.stream().forEach(serverName -> {
3560        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3561          if (err2 != null) {
3562            future.completeExceptionally(unwrapCompletionException(err2));
3563          } else {
3564            serverStates.put(serverName, serverState);
3565          }
3566        }));
3567      });
3568      addListener(
3569        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3570        (ret, err3) -> {
3571          if (!future.isCompletedExceptionally()) {
3572            if (err3 != null) {
3573              future.completeExceptionally(err3);
3574            } else {
3575              future.complete(serverStates);
3576            }
3577          }
3578        });
3579    });
3580    return future;
3581  }
3582
3583  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3584    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3585    if (serverNamesList.isEmpty()) {
3586      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3587        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3588      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3589        if (err != null) {
3590          future.completeExceptionally(err);
3591        } else {
3592          future.complete(clusterMetrics.getServersName());
3593        }
3594      });
3595      return future;
3596    } else {
3597      List<ServerName> serverList = new ArrayList<>();
3598      for (String regionServerName : serverNamesList) {
3599        ServerName serverName = null;
3600        try {
3601          serverName = ServerName.valueOf(regionServerName);
3602        } catch (Exception e) {
3603          future.completeExceptionally(
3604            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3605        }
3606        if (serverName == null) {
3607          future.completeExceptionally(
3608            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3609        } else {
3610          serverList.add(serverName);
3611        }
3612      }
3613      future.complete(serverList);
3614    }
3615    return future;
3616  }
3617
3618  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3619    return this.<Boolean> newAdminCaller().serverName(serverName)
3620      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3621        Boolean> adminCall(controller, stub,
3622          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3623          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3624      .call();
3625  }
3626
3627  @Override
3628  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3629    return this.<Boolean> newMasterCaller()
3630      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3631        Boolean> call(controller, stub,
3632          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3633          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3634          (resp) -> resp.getPrevBalanceValue()))
3635      .call();
3636  }
3637
3638  @Override
3639  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3640    return this.<BalanceResponse> newMasterCaller()
3641      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3642        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3643          (s, c, req, done) -> s.balance(c, req, done),
3644          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3645      .call();
3646  }
3647
3648  @Override
3649  public CompletableFuture<Boolean> isBalancerEnabled() {
3650    return this.<Boolean> newMasterCaller()
3651      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3652        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3653          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3654      .call();
3655  }
3656
3657  @Override
3658  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3659    return this.<Boolean> newMasterCaller()
3660      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3661        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3662          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3663          (resp) -> resp.getPrevNormalizerValue()))
3664      .call();
3665  }
3666
3667  @Override
3668  public CompletableFuture<Boolean> isNormalizerEnabled() {
3669    return this.<Boolean> newMasterCaller()
3670      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3671        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3672          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3673      .call();
3674  }
3675
3676  @Override
3677  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3678    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3679  }
3680
3681  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3682    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3683      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3684  }
3685
3686  @Override
3687  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3688    return this.<Boolean> newMasterCaller()
3689      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3690        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3691          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3692          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3693          (resp) -> resp.getPrevValue()))
3694      .call();
3695  }
3696
3697  @Override
3698  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3699    return this.<Boolean> newMasterCaller()
3700      .action(
3701        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3702          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3703            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3704      .call();
3705  }
3706
3707  @Override
3708  public CompletableFuture<Boolean> runCleanerChore() {
3709    return this.<Boolean> newMasterCaller()
3710      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3711        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3712          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3713          (resp) -> resp.getCleanerChoreRan()))
3714      .call();
3715  }
3716
3717  @Override
3718  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3719    return this.<Boolean> newMasterCaller()
3720      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3721        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3722          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3723      .call();
3724  }
3725
3726  @Override
3727  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3728    return this.<Boolean> newMasterCaller()
3729      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3730        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3731          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3732          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3733      .call();
3734  }
3735
3736  @Override
3737  public CompletableFuture<Integer> runCatalogJanitor() {
3738    return this.<Integer> newMasterCaller()
3739      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3740        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3741          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3742      .call();
3743  }
3744
3745  @Override
3746  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3747    ServiceCaller<S, R> callable) {
3748    MasterCoprocessorRpcChannelImpl channel =
3749      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3750    S stub = stubMaker.apply(channel);
3751    CompletableFuture<R> future = new CompletableFuture<>();
3752    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3753    callable.call(stub, controller, resp -> {
3754      if (controller.failed()) {
3755        future.completeExceptionally(controller.getFailed());
3756      } else {
3757        future.complete(resp);
3758      }
3759    });
3760    return future;
3761  }
3762
3763  @Override
3764  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3765    ServiceCaller<S, R> callable, ServerName serverName) {
3766    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3767      this.<Message> newServerCaller().serverName(serverName));
3768    S stub = stubMaker.apply(channel);
3769    CompletableFuture<R> future = new CompletableFuture<>();
3770    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3771    callable.call(stub, controller, resp -> {
3772      if (controller.failed()) {
3773        future.completeExceptionally(controller.getFailed());
3774      } else {
3775        future.complete(resp);
3776      }
3777    });
3778    return future;
3779  }
3780
3781  @Override
3782  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3783    return this.<List<ServerName>> newMasterCaller()
3784      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3785        List<ServerName>> call(controller, stub,
3786          RequestConverter.buildClearDeadServersRequest(servers),
3787          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3788          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3789      .call();
3790  }
3791
3792  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3793    return this.connection.callerFactory.<T> serverRequest()
3794      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3795      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3796      .pause(pauseNs, TimeUnit.NANOSECONDS)
3797      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3798      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3799  }
3800
3801  @Override
3802  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3803    if (tableName == null) {
3804      return failedFuture(new IllegalArgumentException("Table name is null"));
3805    }
3806    CompletableFuture<Void> future = new CompletableFuture<>();
3807    addListener(tableExists(tableName), (exist, err) -> {
3808      if (err != null) {
3809        future.completeExceptionally(err);
3810        return;
3811      }
3812      if (!exist) {
3813        future.completeExceptionally(new TableNotFoundException(
3814          "Table '" + tableName.getNameAsString() + "' does not exists."));
3815        return;
3816      }
3817      addListener(getTableSplits(tableName), (splits, err1) -> {
3818        if (err1 != null) {
3819          future.completeExceptionally(err1);
3820        } else {
3821          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3822            if (err2 != null) {
3823              future.completeExceptionally(err2);
3824            } else {
3825              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3826                if (err3 != null) {
3827                  future.completeExceptionally(err3);
3828                } else {
3829                  future.complete(result3);
3830                }
3831              });
3832            }
3833          });
3834        }
3835      });
3836    });
3837    return future;
3838  }
3839
3840  @Override
3841  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3842    if (tableName == null) {
3843      return failedFuture(new IllegalArgumentException("Table name is null"));
3844    }
3845    CompletableFuture<Void> future = new CompletableFuture<>();
3846    addListener(tableExists(tableName), (exist, err) -> {
3847      if (err != null) {
3848        future.completeExceptionally(err);
3849        return;
3850      }
3851      if (!exist) {
3852        future.completeExceptionally(new TableNotFoundException(
3853          "Table '" + tableName.getNameAsString() + "' does not exists."));
3854        return;
3855      }
3856      addListener(setTableReplication(tableName, false), (result, err2) -> {
3857        if (err2 != null) {
3858          future.completeExceptionally(err2);
3859        } else {
3860          future.complete(result);
3861        }
3862      });
3863    });
3864    return future;
3865  }
3866
3867  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3868    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3869    addListener(
3870      getRegions(tableName).thenApply(regions -> regions.stream()
3871        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3872      (regions, err2) -> {
3873        if (err2 != null) {
3874          future.completeExceptionally(err2);
3875          return;
3876        }
3877        if (regions.size() == 1) {
3878          future.complete(null);
3879        } else {
3880          byte[][] splits = new byte[regions.size() - 1][];
3881          for (int i = 1; i < regions.size(); i++) {
3882            splits[i - 1] = regions.get(i).getStartKey();
3883          }
3884          future.complete(splits);
3885        }
3886      });
3887    return future;
3888  }
3889
3890  /**
3891   * Connect to peer and check the table descriptor on peer:
3892   * <ol>
3893   * <li>Create the same table on peer when not exist.</li>
3894   * <li>Throw an exception if the table already has replication enabled on any of the column
3895   * families.</li>
3896   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3897   * </ol>
3898   * @param tableName name of the table to sync to the peer
3899   * @param splits    table split keys
3900   */
3901  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3902    byte[][] splits) {
3903    CompletableFuture<Void> future = new CompletableFuture<>();
3904    addListener(listReplicationPeers(), (peers, err) -> {
3905      if (err != null) {
3906        future.completeExceptionally(err);
3907        return;
3908      }
3909      if (peers == null || peers.size() <= 0) {
3910        future.completeExceptionally(
3911          new IllegalArgumentException("Found no peer cluster for replication."));
3912        return;
3913      }
3914      List<CompletableFuture<Void>> futures = new ArrayList<>();
3915      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3916        .forEach(peer -> {
3917          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3918        });
3919      addListener(
3920        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3921        (result, err2) -> {
3922          if (err2 != null) {
3923            future.completeExceptionally(err2);
3924          } else {
3925            future.complete(result);
3926          }
3927        });
3928    });
3929    return future;
3930  }
3931
3932  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3933    ReplicationPeerDescription peer) {
3934    Configuration peerConf;
3935    try {
3936      peerConf = ReplicationPeerConfigUtil
3937        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3938    } catch (IOException e) {
3939      return failedFuture(e);
3940    }
3941    URI connectionUri =
3942      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3943    CompletableFuture<Void> future = new CompletableFuture<>();
3944    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3945      if (err != null) {
3946        future.completeExceptionally(err);
3947        return;
3948      }
3949      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3950        if (err1 != null) {
3951          future.completeExceptionally(err1);
3952          return;
3953        }
3954        AsyncAdmin peerAdmin = conn.getAdmin();
3955        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3956          if (err2 != null) {
3957            future.completeExceptionally(err2);
3958            return;
3959          }
3960          if (!exist) {
3961            CompletableFuture<Void> createTableFuture = null;
3962            if (splits == null) {
3963              createTableFuture = peerAdmin.createTable(tableDesc);
3964            } else {
3965              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3966            }
3967            addListener(createTableFuture, (result, err3) -> {
3968              if (err3 != null) {
3969                future.completeExceptionally(err3);
3970              } else {
3971                future.complete(result);
3972              }
3973            });
3974          } else {
3975            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3976              (result, err4) -> {
3977                if (err4 != null) {
3978                  future.completeExceptionally(err4);
3979                } else {
3980                  future.complete(result);
3981                }
3982              });
3983          }
3984        });
3985      });
3986    });
3987    return future;
3988  }
3989
3990  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3991    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3992    CompletableFuture<Void> future = new CompletableFuture<>();
3993    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3994      if (err != null) {
3995        future.completeExceptionally(err);
3996        return;
3997      }
3998      if (peerTableDesc == null) {
3999        future.completeExceptionally(
4000          new IllegalArgumentException("Failed to get table descriptor for table "
4001            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
4002        return;
4003      }
4004      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
4005        future.completeExceptionally(new IllegalArgumentException(
4006          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
4007            + ", but the table descriptors are not same when compared with source cluster."
4008            + " Thus can not enable the table's replication switch."));
4009        return;
4010      }
4011      future.complete(null);
4012    });
4013    return future;
4014  }
4015
4016  /**
4017   * Set the table's replication switch if the table's replication switch is already not set.
4018   * @param tableName name of the table
4019   * @param enableRep is replication switch enable or disable
4020   */
4021  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
4022    CompletableFuture<Void> future = new CompletableFuture<>();
4023    addListener(getDescriptor(tableName), (tableDesc, err) -> {
4024      if (err != null) {
4025        future.completeExceptionally(err);
4026        return;
4027      }
4028      if (!tableDesc.matchReplicationScope(enableRep)) {
4029        int scope =
4030          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
4031        TableDescriptor newTableDesc =
4032          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
4033        addListener(modifyTable(newTableDesc), (result, err2) -> {
4034          if (err2 != null) {
4035            future.completeExceptionally(err2);
4036          } else {
4037            future.complete(result);
4038          }
4039        });
4040      } else {
4041        future.complete(null);
4042      }
4043    });
4044    return future;
4045  }
4046
4047  @Override
4048  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
4049    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
4050    request.setPeerId(peerId);
4051    return this.<Boolean> newMasterCaller()
4052      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
4053        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
4054          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
4055          resp -> resp.getIsEnabled()))
4056      .call();
4057  }
4058
4059  private void waitUntilAllReplicationPeerModificationProceduresDone(
4060    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
4061    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
4062      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
4063        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
4064          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
4065            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
4066            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
4067            resp -> resp.getProcedureList()))
4068        .call();
4069    addListener(callFuture, (r, e) -> {
4070      if (e != null) {
4071        future.completeExceptionally(e);
4072      } else if (r.isEmpty()) {
4073        // we are done
4074        future.complete(prevOn);
4075      } else {
4076        // retry later to see if the procedures are done
4077        retryTimer.newTimeout(
4078          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
4079          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
4080      }
4081    });
4082  }
4083
4084  @Override
4085  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
4086    boolean drainProcedures) {
4087    ReplicationPeerModificationSwitchRequest request =
4088      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
4089    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
4090      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
4091        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
4092          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
4093          resp -> resp.getPreviousValue()))
4094      .call();
4095    // if we do not need to wait all previous peer modification procedure done, or we are enabling
4096    // peer modification, just return here.
4097    if (!drainProcedures || on) {
4098      return callFuture;
4099    }
4100    // otherwise we need to wait until all previous peer modification procedure done
4101    CompletableFuture<Boolean> future = new CompletableFuture<>();
4102    addListener(callFuture, (prevOn, err) -> {
4103      if (err != null) {
4104        future.completeExceptionally(err);
4105        return;
4106      }
4107      // even if the previous state is disabled, we still need to wait here, as there could be
4108      // another client thread which called this method just before us and have already changed the
4109      // state to off, but there are still peer modification procedures not finished, so we should
4110      // also wait here.
4111      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
4112    });
4113    return future;
4114  }
4115
4116  @Override
4117  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
4118    return this.<Boolean> newMasterCaller()
4119      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
4120        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
4121          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
4122          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
4123          (resp) -> resp.getEnabled()))
4124      .call();
4125  }
4126
4127  @Override
4128  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
4129    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
4130    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
4131      if (err != null) {
4132        future.completeExceptionally(err);
4133        return;
4134      }
4135      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4136        locations.stream().filter(l -> l.getRegion() != null)
4137          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4138          .collect(Collectors.groupingBy(l -> l.getServerName(),
4139            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4140      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4141      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4142      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4143        futures
4144          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4145            if (err2 != null) {
4146              future.completeExceptionally(unwrapCompletionException(err2));
4147            } else {
4148              aggregator.append(stats);
4149            }
4150          }));
4151      }
4152      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4153        (ret, err3) -> {
4154          if (err3 != null) {
4155            future.completeExceptionally(unwrapCompletionException(err3));
4156          } else {
4157            future.complete(aggregator.sum());
4158          }
4159        });
4160    });
4161    return future;
4162  }
4163
4164  @Override
4165  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4166    boolean preserveSplits) {
4167    CompletableFuture<Void> future = new CompletableFuture<>();
4168    addListener(tableExists(tableName), (exist, err) -> {
4169      if (err != null) {
4170        future.completeExceptionally(err);
4171        return;
4172      }
4173      if (!exist) {
4174        future.completeExceptionally(new TableNotFoundException(tableName));
4175        return;
4176      }
4177      addListener(tableExists(newTableName), (exist1, err1) -> {
4178        if (err1 != null) {
4179          future.completeExceptionally(err1);
4180          return;
4181        }
4182        if (exist1) {
4183          future.completeExceptionally(new TableExistsException(newTableName));
4184          return;
4185        }
4186        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4187          if (err2 != null) {
4188            future.completeExceptionally(err2);
4189            return;
4190          }
4191          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4192          if (preserveSplits) {
4193            addListener(getTableSplits(tableName), (splits, err3) -> {
4194              if (err3 != null) {
4195                future.completeExceptionally(err3);
4196              } else {
4197                addListener(
4198                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4199                  (result, err4) -> {
4200                    if (err4 != null) {
4201                      future.completeExceptionally(err4);
4202                    } else {
4203                      future.complete(result);
4204                    }
4205                  });
4206              }
4207            });
4208          } else {
4209            addListener(createTable(newTableDesc), (result, err5) -> {
4210              if (err5 != null) {
4211                future.completeExceptionally(err5);
4212              } else {
4213                future.complete(result);
4214              }
4215            });
4216          }
4217        });
4218      });
4219    });
4220    return future;
4221  }
4222
4223  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4224    List<RegionInfo> hris) {
4225    return this.<CacheEvictionStats> newAdminCaller()
4226      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4227        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4228          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4229          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4230          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4231      .serverName(serverName).call();
4232  }
4233
4234  @Override
4235  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4236    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4237      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4238        Boolean> call(controller, stub,
4239          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4240          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4241          resp -> resp.getPreviousRpcThrottleEnabled()))
4242      .call();
4243    return future;
4244  }
4245
4246  @Override
4247  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4248    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4249      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4250        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4251          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4252          resp -> resp.getRpcThrottleEnabled()))
4253      .call();
4254    return future;
4255  }
4256
4257  @Override
4258  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4259    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4260      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4261        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4262          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4263            .build(),
4264          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4265          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4266      .call();
4267    return future;
4268  }
4269
4270  @Override
4271  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4272    return this.<Map<TableName, Long>> newMasterCaller()
4273      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4274        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4275          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4276          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4277          resp -> resp.getSizesList().stream().collect(Collectors
4278            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4279      .call();
4280  }
4281
4282  @Override
4283  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4284    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4285    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4286      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4287        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4288          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4289          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4290          resp -> resp.getSnapshotsList().stream()
4291            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4292              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4293      .serverName(serverName).call();
4294  }
4295
4296  private CompletableFuture<SpaceQuotaSnapshot>
4297    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4298    return this.<SpaceQuotaSnapshot> newMasterCaller()
4299      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4300        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4301          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4302      .call();
4303  }
4304
4305  @Override
4306  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4307    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4308      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4309      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4310  }
4311
4312  @Override
4313  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4314    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4315    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4316      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4317      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4318  }
4319
4320  @Override
4321  public CompletableFuture<Void> grant(UserPermission userPermission,
4322    boolean mergeExistingPermissions) {
4323    return this.<Void> newMasterCaller()
4324      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4325        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4326        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4327      .call();
4328  }
4329
4330  @Override
4331  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4332    return this.<Void> newMasterCaller()
4333      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4334        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4335        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4336      .call();
4337  }
4338
4339  @Override
4340  public CompletableFuture<List<UserPermission>>
4341    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4342    return this.<List<UserPermission>> newMasterCaller()
4343      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4344        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4345          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4346          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4347          resp -> resp.getUserPermissionList().stream()
4348            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4349            .collect(Collectors.toList())))
4350      .call();
4351  }
4352
4353  @Override
4354  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4355    List<Permission> permissions) {
4356    return this.<List<Boolean>> newMasterCaller()
4357      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4358        List<Boolean>> call(controller, stub,
4359          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4360          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4361          resp -> resp.getHasUserPermissionList()))
4362      .call();
4363  }
4364
4365  @Override
4366  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4367    return this.<Boolean> newMasterCaller()
4368      .action((controller, stub) -> this.call(controller, stub,
4369        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4370        MasterService.Interface::switchSnapshotCleanup,
4371        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4372      .call();
4373  }
4374
4375  @Override
4376  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4377    return this.<Boolean> newMasterCaller()
4378      .action((controller, stub) -> this.call(controller, stub,
4379        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4380        MasterService.Interface::isSnapshotCleanupEnabled,
4381        IsSnapshotCleanupEnabledResponse::getEnabled))
4382      .call();
4383  }
4384
4385  @Override
4386  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4387    return this.<Void> newMasterCaller()
4388      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4389        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4390        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4391      .call();
4392  }
4393
4394  @Override
4395  public CompletableFuture<Void> addRSGroup(String groupName) {
4396    return this.<Void> newMasterCaller()
4397      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4398        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4399        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4400      .call();
4401  }
4402
4403  @Override
4404  public CompletableFuture<Void> removeRSGroup(String groupName) {
4405    return this.<Void> newMasterCaller()
4406      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4407        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4408        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4409      .call();
4410  }
4411
4412  @Override
4413  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4414    BalanceRequest request) {
4415    return this.<BalanceResponse> newMasterCaller()
4416      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4417        BalanceResponse> call(controller, stub,
4418          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4419          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4420      .call();
4421  }
4422
4423  @Override
4424  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4425    return this.<List<RSGroupInfo>> newMasterCaller()
4426      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4427        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4428          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4429            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4430      .call();
4431  }
4432
4433  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4434    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4435    final String logType) {
4436    if (CollectionUtils.isEmpty(serverNames)) {
4437      return CompletableFuture.completedFuture(Collections.emptyList());
4438    }
4439    return CompletableFuture.supplyAsync(() -> serverNames
4440      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4441        filterParams, limit, logType))
4442      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4443  }
4444
4445  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4446    Map<String, Object> filterParams, int limit, String logType) {
4447    return this.<List<LogEntry>> newAdminCaller()
4448      .action((controller, stub) -> this.adminCall(controller, stub,
4449        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4450        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4451      .serverName(serverName).call();
4452  }
4453
4454  @Override
4455  public CompletableFuture<List<Boolean>>
4456    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4457    if (CollectionUtils.isEmpty(serverNames)) {
4458      return CompletableFuture.completedFuture(Collections.emptyList());
4459    }
4460    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4461      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4462    return convertToFutureOfList(clearSlowLogResponseList);
4463  }
4464
4465  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4466    return this.<Boolean> newAdminCaller()
4467      .action((controller, stub) -> this.adminCall(controller, stub,
4468        RequestConverter.buildClearSlowLogResponseRequest(),
4469        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4470      .serverName(serverName).call();
4471  }
4472
4473  private static <T> CompletableFuture<List<T>>
4474    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4475    CompletableFuture<Void> allDoneFuture =
4476      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4477    return allDoneFuture
4478      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4479  }
4480
4481  @Override
4482  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4483    return this.<List<TableName>> newMasterCaller()
4484      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4485        List<TableName>> call(controller, stub,
4486          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4487          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4488            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4489      .call();
4490  }
4491
4492  @Override
4493  public CompletableFuture<Pair<List<String>, List<TableName>>>
4494    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4495    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4496      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4497        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4498        Pair<List<String>, List<TableName>>> call(controller, stub,
4499          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4500            .build(),
4501          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4502          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4503            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4504      .call();
4505  }
4506
4507  @Override
4508  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4509    return this.<RSGroupInfo> newMasterCaller()
4510      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4511        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4512          GetRSGroupInfoOfServerRequest.newBuilder()
4513            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4514              .setPort(hostPort.getPort()).build())
4515            .build(),
4516          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4517          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4518      .call();
4519  }
4520
4521  @Override
4522  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4523    return this.<Void> newMasterCaller()
4524      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4525        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4526        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4527      .call();
4528  }
4529
4530  @Override
4531  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4532    CompletableFuture<Void> future = new CompletableFuture<>();
4533    for (TableName tableName : tables) {
4534      addListener(tableExists(tableName), (exist, err) -> {
4535        if (err != null) {
4536          future.completeExceptionally(err);
4537          return;
4538        }
4539        if (!exist) {
4540          future.completeExceptionally(new TableNotFoundException(tableName));
4541          return;
4542        }
4543      });
4544    }
4545    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4546      if (err != null) {
4547        future.completeExceptionally(err);
4548        return;
4549      }
4550      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4551        future.complete(null);
4552        return;
4553      }
4554      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4555      for (TableDescriptor td : tableDescriptions) {
4556        newTableDescriptors
4557          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4558      }
4559      addListener(
4560        CompletableFuture.allOf(
4561          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4562        (v, e) -> {
4563          if (e != null) {
4564            future.completeExceptionally(e);
4565          } else {
4566            future.complete(v);
4567          }
4568        });
4569    });
4570    return future;
4571  }
4572
4573  @Override
4574  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4575    return this.<RSGroupInfo> newMasterCaller()
4576      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4577        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4578          GetRSGroupInfoOfTableRequest.newBuilder()
4579            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4580          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4581          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4582      .call();
4583  }
4584
4585  @Override
4586  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4587    return this.<RSGroupInfo> newMasterCaller()
4588      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4589        RSGroupInfo> call(controller, stub,
4590          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4591          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4592          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4593      .call();
4594  }
4595
4596  @Override
4597  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4598    return this.<Void> newMasterCaller()
4599      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4600        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4601          .setNewRsgroupName(newName).build(),
4602        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4603      .call();
4604  }
4605
4606  @Override
4607  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4608    Map<String, String> configuration) {
4609    UpdateRSGroupConfigRequest.Builder request =
4610      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4611    if (configuration != null) {
4612      configuration.entrySet().forEach(e -> request.addConfiguration(
4613        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4614    }
4615    return this.<Void> newMasterCaller()
4616      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4617        Void> call(controller, stub, request.build(),
4618          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4619      .call();
4620  }
4621
4622  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4623    return this.<List<LogEntry>> newMasterCaller()
4624      .action((controller, stub) -> this.call(controller, stub,
4625        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4626        ProtobufUtil::toBalancerDecisionResponse))
4627      .call();
4628  }
4629
4630  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4631    return this.<List<LogEntry>> newMasterCaller()
4632      .action((controller, stub) -> this.call(controller, stub,
4633        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4634        ProtobufUtil::toBalancerRejectionResponse))
4635      .call();
4636  }
4637
4638  @Override
4639  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4640    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4641    if (logType == null || serverType == null) {
4642      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4643    }
4644    switch (logType) {
4645      case "SLOW_LOG":
4646      case "LARGE_LOG":
4647        if (ServerType.MASTER.equals(serverType)) {
4648          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4649        }
4650        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4651      case "BALANCER_DECISION":
4652        if (ServerType.REGION_SERVER.equals(serverType)) {
4653          throw new IllegalArgumentException(
4654            "Balancer Decision logs are not maintained by HRegionServer");
4655        }
4656        return getBalancerDecisions(limit);
4657      case "BALANCER_REJECTION":
4658        if (ServerType.REGION_SERVER.equals(serverType)) {
4659          throw new IllegalArgumentException(
4660            "Balancer Rejection logs are not maintained by HRegionServer");
4661        }
4662        return getBalancerRejections(limit);
4663      default:
4664        return CompletableFuture.completedFuture(Collections.emptyList());
4665    }
4666  }
4667
4668  @Override
4669  public CompletableFuture<Void> flushMasterStore() {
4670    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4671    return this.<Void> newMasterCaller()
4672      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4673        Void> call(controller, stub, request.build(),
4674          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4675      .call();
4676  }
4677
4678  @Override
4679  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4680    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4681    return this.<List<String>> newAdminCaller()
4682      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4683        List<String>> adminCall(controller, stub, request.build(),
4684          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4685          resp -> resp.getCachedFilesList()))
4686      .serverName(serverName).call();
4687  }
4688
4689  @Override
4690  public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) {
4691    MasterProtos.RestoreBackupSystemTableRequest request =
4692      MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName)
4693        .build();
4694    return this.<MasterProtos.RestoreBackupSystemTableRequest,
4695      MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request,
4696        MasterService.Interface::restoreBackupSystemTable,
4697        MasterProtos.RestoreBackupSystemTableResponse::getProcId,
4698        new RestoreBackupSystemTableProcedureBiConsumer());
4699  }
4700}