001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.quotas.QuotaFilter;
080import org.apache.hadoop.hbase.quotas.QuotaSettings;
081import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
083import org.apache.hadoop.hbase.replication.ReplicationException;
084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
086import org.apache.hadoop.hbase.replication.SyncReplicationState;
087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
104import org.apache.hbase.thirdparty.com.google.protobuf.Message;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
108import org.apache.hbase.thirdparty.io.netty.util.Timeout;
109import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
111
112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
343
344/**
345 * The implementation of AsyncAdmin.
346 * <p>
347 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
348 * be finished inside the rpc framework thread, which means that the callbacks registered to the
349 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
350 * this class should not try to do time consuming tasks in the callbacks.
351 * @since 2.0.0
352 * @see AsyncHBaseAdmin
353 * @see AsyncConnection#getAdmin()
354 * @see AsyncConnection#getAdminBuilder()
355 */
356@InterfaceAudience.Private
357class RawAsyncHBaseAdmin implements AsyncAdmin {
358
359  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
360
361  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
362
363  private final AsyncConnectionImpl connection;
364
365  private final HashedWheelTimer retryTimer;
366
367  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
368
369  private final long rpcTimeoutNs;
370
371  private final long operationTimeoutNs;
372
373  private final long pauseNs;
374
375  private final long pauseNsForServerOverloaded;
376
377  private final int maxAttempts;
378
379  private final int startLogErrorsCnt;
380
381  private final NonceGenerator ng;
382
383  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
384    AsyncAdminBuilderBase builder) {
385    this.connection = connection;
386    this.retryTimer = retryTimer;
387    this.metaTable = connection.getTable(META_TABLE_NAME);
388    this.rpcTimeoutNs = builder.rpcTimeoutNs;
389    this.operationTimeoutNs = builder.operationTimeoutNs;
390    this.pauseNs = builder.pauseNs;
391    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
392      LOG.warn(
393        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
394          + " the normal pause value {} ms, use the greater one instead",
395        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
396        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
397      this.pauseNsForServerOverloaded = builder.pauseNs;
398    } else {
399      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
400    }
401    this.maxAttempts = builder.maxAttempts;
402    this.startLogErrorsCnt = builder.startLogErrorsCnt;
403    this.ng = connection.getNonceGenerator();
404  }
405
406  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
407    return this.connection.callerFactory.<T> masterRequest()
408      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
409      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
410      .pause(pauseNs, TimeUnit.NANOSECONDS)
411      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
412      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
413  }
414
415  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
416    return this.connection.callerFactory.<T> adminRequest()
417      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
418      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
419      .pause(pauseNs, TimeUnit.NANOSECONDS)
420      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
421      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
422  }
423
424  @FunctionalInterface
425  private interface MasterRpcCall<RESP, REQ> {
426    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
427      RpcCallback<RESP> done);
428  }
429
430  @FunctionalInterface
431  private interface AdminRpcCall<RESP, REQ> {
432    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
433      RpcCallback<RESP> done);
434  }
435
436  @FunctionalInterface
437  private interface Converter<D, S> {
438    D convert(S src) throws IOException;
439  }
440
441  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
442    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
443    Converter<RESP, PRESP> respConverter) {
444    CompletableFuture<RESP> future = new CompletableFuture<>();
445    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
446
447      @Override
448      public void run(PRESP resp) {
449        if (controller.failed()) {
450          future.completeExceptionally(controller.getFailed());
451        } else {
452          try {
453            future.complete(respConverter.convert(resp));
454          } catch (IOException e) {
455            future.completeExceptionally(e);
456          }
457        }
458      }
459    });
460    return future;
461  }
462
463  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
464    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
465    Converter<RESP, PRESP> respConverter) {
466    CompletableFuture<RESP> future = new CompletableFuture<>();
467    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
468
469      @Override
470      public void run(PRESP resp) {
471        if (controller.failed()) {
472          future.completeExceptionally(controller.getFailed());
473        } else {
474          try {
475            future.complete(respConverter.convert(resp));
476          } catch (IOException e) {
477            future.completeExceptionally(e);
478          }
479        }
480      }
481    });
482    return future;
483  }
484
485  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
486    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
487    ProcedureBiConsumer consumer) {
488    return procedureCall(b -> {
489    }, preq, rpcCall, respConverter, consumer);
490  }
491
492  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
493    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
494    ProcedureBiConsumer consumer) {
495    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
496  }
497
498  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
499    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
500    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
501    ProcedureBiConsumer consumer) {
502    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
503      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
504    prioritySetter.accept(builder);
505    CompletableFuture<Long> procFuture = builder.call();
506    CompletableFuture<Void> future = waitProcedureResult(procFuture);
507    addListener(future, consumer);
508    return future;
509  }
510
511  @Override
512  public CompletableFuture<Boolean> tableExists(TableName tableName) {
513    if (TableName.isMetaTableName(tableName)) {
514      return CompletableFuture.completedFuture(true);
515    }
516    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
517  }
518
519  @Override
520  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
521    return getTableDescriptors(
522      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
523  }
524
525  /**
526   * {@link #listTableDescriptors(boolean)}
527   */
528  @Override
529  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
530    boolean includeSysTables) {
531    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
532      + "use listTableDescriptors(boolean) instead");
533    return getTableDescriptors(
534      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
535  }
536
537  @Override
538  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
539    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
540      + "use listTableDescriptors(boolean) instead");
541    if (tableNames.isEmpty()) {
542      return CompletableFuture.completedFuture(Collections.emptyList());
543    }
544    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
545  }
546
547  private CompletableFuture<List<TableDescriptor>>
548    getTableDescriptors(GetTableDescriptorsRequest request) {
549    return this.<List<TableDescriptor>> newMasterCaller()
550      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
551        List<TableDescriptor>> call(controller, stub, request,
552          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
553          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
554      .call();
555  }
556
557  @Override
558  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
559    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
560  }
561
562  @Override
563  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
564    boolean includeSysTables) {
565    Preconditions.checkNotNull(pattern,
566      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
567    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
568  }
569
570  @Override
571  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
572    return this.<List<TableName>> newMasterCaller()
573      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
574        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
575          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
576          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
577          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
578      .call();
579  }
580
581  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
582    return this.<List<TableName>> newMasterCaller()
583      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
584        List<TableName>> call(controller, stub, request,
585          (s, c, req, done) -> s.getTableNames(c, req, done),
586          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
587      .call();
588  }
589
590  @Override
591  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
592    return this.<List<TableDescriptor>> newMasterCaller()
593      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
594        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
595          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
596          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
597          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
598      .call();
599  }
600
601  @Override
602  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
603    return this.<List<TableDescriptor>> newMasterCaller()
604      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
605        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
606          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
607          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
608          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
609      .call();
610  }
611
612  @Override
613  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
614    return this.<List<TableName>> newMasterCaller()
615      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
616        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
617          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
618          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
619          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
620      .call();
621  }
622
623  @Override
624  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
625    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
626    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
627      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
628        List<TableSchema>> call(controller, stub,
629          RequestConverter.buildGetTableDescriptorsRequest(tableName),
630          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
631          (resp) -> resp.getTableSchemaList()))
632      .call(), (tableSchemas, error) -> {
633        if (error != null) {
634          future.completeExceptionally(error);
635          return;
636        }
637        if (!tableSchemas.isEmpty()) {
638          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
639        } else {
640          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
641        }
642      });
643    return future;
644  }
645
646  @Override
647  public CompletableFuture<Void> createTable(TableDescriptor desc) {
648    return createTable(desc.getTableName(),
649      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
650  }
651
652  @Override
653  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
654    int numRegions) {
655    try {
656      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
657    } catch (IllegalArgumentException e) {
658      return failedFuture(e);
659    }
660  }
661
662  @Override
663  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
664    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
665      + " use createTable(TableDescriptor) instead");
666    try {
667      verifySplitKeys(splitKeys);
668      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
669        splitKeys, ng.getNonceGroup(), ng.newNonce()));
670    } catch (IllegalArgumentException e) {
671      return failedFuture(e);
672    }
673  }
674
675  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
676    Preconditions.checkNotNull(tableName, "table name is null");
677    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
678      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
679      new CreateTableProcedureBiConsumer(tableName));
680  }
681
682  @Override
683  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
684    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
685      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
686        ng.newNonce()),
687      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
688      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
689  }
690
691  @Override
692  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
693    return this.<ModifyTableStoreFileTrackerRequest,
694      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
695        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
696          ng.getNonceGroup(), ng.newNonce()),
697        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
698        (resp) -> resp.getProcId(),
699        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
700  }
701
702  @Override
703  public CompletableFuture<Void> deleteTable(TableName tableName) {
704    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
705      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
706      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
707      new DeleteTableProcedureBiConsumer(tableName));
708  }
709
710  @Override
711  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
712    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
713      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
714        ng.newNonce()),
715      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
716      new TruncateTableProcedureBiConsumer(tableName));
717  }
718
719  @Override
720  public CompletableFuture<Void> enableTable(TableName tableName) {
721    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
722      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
723      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
724      new EnableTableProcedureBiConsumer(tableName));
725  }
726
727  @Override
728  public CompletableFuture<Void> disableTable(TableName tableName) {
729    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
730      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
731      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
732      new DisableTableProcedureBiConsumer(tableName));
733  }
734
735  /**
736   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
737   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
738   * targetState).
739   */
740  private static CompletableFuture<Boolean> completeCheckTableState(
741    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
742    TableState.State targetState, TableName tableName) {
743    if (error != null) {
744      future.completeExceptionally(error);
745    } else {
746      if (tableState != null) {
747        future.complete(tableState.inStates(targetState));
748      } else {
749        future.completeExceptionally(new TableNotFoundException(tableName));
750      }
751    }
752    return future;
753  }
754
755  @Override
756  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
757    if (TableName.isMetaTableName(tableName)) {
758      return CompletableFuture.completedFuture(true);
759    }
760    CompletableFuture<Boolean> future = new CompletableFuture<>();
761    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
762      (tableState, error) -> {
763        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
764          TableState.State.ENABLED, tableName);
765      });
766    return future;
767  }
768
769  @Override
770  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
771    if (TableName.isMetaTableName(tableName)) {
772      return CompletableFuture.completedFuture(false);
773    }
774    CompletableFuture<Boolean> future = new CompletableFuture<>();
775    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
776      (tableState, error) -> {
777        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
778          TableState.State.DISABLED, tableName);
779      });
780    return future;
781  }
782
783  @Override
784  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
785    if (TableName.isMetaTableName(tableName)) {
786      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
787        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
788    }
789    CompletableFuture<Boolean> future = new CompletableFuture<>();
790    addListener(isTableEnabled(tableName), (enabled, error) -> {
791      if (error != null) {
792        if (error instanceof TableNotFoundException) {
793          future.complete(false);
794        } else {
795          future.completeExceptionally(error);
796        }
797        return;
798      }
799      if (!enabled) {
800        future.complete(false);
801      } else {
802        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
803          (locations, error1) -> {
804            if (error1 != null) {
805              future.completeExceptionally(error1);
806              return;
807            }
808            List<HRegionLocation> notDeployedRegions = locations.stream()
809              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
810            if (notDeployedRegions.size() > 0) {
811              if (LOG.isDebugEnabled()) {
812                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
813              }
814              future.complete(false);
815              return;
816            }
817            future.complete(true);
818          });
819      }
820    });
821    return future;
822  }
823
824  @Override
825  public CompletableFuture<Void> addColumnFamily(TableName tableName,
826    ColumnFamilyDescriptor columnFamily) {
827    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
828      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
829        ng.newNonce()),
830      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
831      new AddColumnFamilyProcedureBiConsumer(tableName));
832  }
833
834  @Override
835  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
836    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
837      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
838        ng.newNonce()),
839      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
840      new DeleteColumnFamilyProcedureBiConsumer(tableName));
841  }
842
843  @Override
844  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
845    ColumnFamilyDescriptor columnFamily) {
846    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
847      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
848        ng.newNonce()),
849      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
850      new ModifyColumnFamilyProcedureBiConsumer(tableName));
851  }
852
853  @Override
854  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
855    byte[] family, String dstSFT) {
856    return this.<ModifyColumnStoreFileTrackerRequest,
857      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
858        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
859          ng.getNonceGroup(), ng.newNonce()),
860        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
861        (resp) -> resp.getProcId(),
862        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
863  }
864
865  @Override
866  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
867    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
868      RequestConverter.buildCreateNamespaceRequest(descriptor),
869      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
870      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
871  }
872
873  @Override
874  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
875    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
876      RequestConverter.buildModifyNamespaceRequest(descriptor),
877      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
878      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
879  }
880
881  @Override
882  public CompletableFuture<Void> deleteNamespace(String name) {
883    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
884      RequestConverter.buildDeleteNamespaceRequest(name),
885      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
886      new DeleteNamespaceProcedureBiConsumer(name));
887  }
888
889  @Override
890  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
891    return this.<NamespaceDescriptor> newMasterCaller()
892      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
893        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
894          RequestConverter.buildGetNamespaceDescriptorRequest(name),
895          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
896          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
897      .call();
898  }
899
900  @Override
901  public CompletableFuture<List<String>> listNamespaces() {
902    return this.<List<String>> newMasterCaller()
903      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
904        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
905          (s, c, req, done) -> s.listNamespaces(c, req, done),
906          (resp) -> resp.getNamespaceNameList()))
907      .call();
908  }
909
910  @Override
911  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
912    return this.<List<NamespaceDescriptor>> newMasterCaller()
913      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
914        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
915          ListNamespaceDescriptorsRequest.newBuilder().build(),
916          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
917          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
918      .call();
919  }
920
921  @Override
922  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
923    return this.<List<RegionInfo>> newAdminCaller()
924      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
925        List<RegionInfo>> adminCall(controller, stub,
926          RequestConverter.buildGetOnlineRegionRequest(),
927          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
928          resp -> ProtobufUtil.getRegionInfos(resp)))
929      .serverName(serverName).call();
930  }
931
932  @Override
933  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
934    if (tableName.equals(META_TABLE_NAME)) {
935      return connection.registry.getMetaRegionLocations()
936        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
937          .collect(Collectors.toList()));
938    } else {
939      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
940        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
941    }
942  }
943
944  @Override
945  public CompletableFuture<Void> flush(TableName tableName) {
946    return flush(tableName, null);
947  }
948
949  @Override
950  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
951    CompletableFuture<Void> future = new CompletableFuture<>();
952    addListener(tableExists(tableName), (exists, err) -> {
953      if (err != null) {
954        future.completeExceptionally(err);
955      } else if (!exists) {
956        future.completeExceptionally(new TableNotFoundException(tableName));
957      } else {
958        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
959          if (err2 != null) {
960            future.completeExceptionally(err2);
961          } else if (!tableEnabled) {
962            future.completeExceptionally(new TableNotEnabledException(tableName));
963          } else {
964            Map<String, String> props = new HashMap<>();
965            if (columnFamily != null) {
966              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
967            }
968            addListener(
969              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
970              (ret, err3) -> {
971                if (err3 != null) {
972                  future.completeExceptionally(err3);
973                } else {
974                  future.complete(ret);
975                }
976              });
977          }
978        });
979      }
980    });
981    return future;
982  }
983
984  @Override
985  public CompletableFuture<Void> flushRegion(byte[] regionName) {
986    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
987    });
988  }
989
990  @Override
991  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
992    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
993      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
994    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
995    });
996  }
997
998  /**
999   * This method is for internal use only, where we need the response of the flush.
1000   * <p/>
1001   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1002   * API.
1003   */
1004  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1005    boolean writeFlushWALMarker) {
1006    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1007    addListener(getRegionLocation(regionName), (location, err) -> {
1008      if (err != null) {
1009        future.completeExceptionally(err);
1010        return;
1011      }
1012      ServerName serverName = location.getServerName();
1013      if (serverName == null) {
1014        future
1015          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1016        return;
1017      }
1018      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1019        (ret, err2) -> {
1020          if (err2 != null) {
1021            future.completeExceptionally(err2);
1022          } else {
1023            future.complete(ret);
1024          }
1025        });
1026    });
1027    return future;
1028  }
1029
1030  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1031    byte[] columnFamily, boolean writeFlushWALMarker) {
1032    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1033      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1034        FlushRegionResponse> adminCall(controller, stub,
1035          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1036            writeFlushWALMarker),
1037          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1038      .call();
1039  }
1040
1041  @Override
1042  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1043    CompletableFuture<Void> future = new CompletableFuture<>();
1044    addListener(getRegions(sn), (hRegionInfos, err) -> {
1045      if (err != null) {
1046        future.completeExceptionally(err);
1047        return;
1048      }
1049      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1050      if (hRegionInfos != null) {
1051        hRegionInfos
1052          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1053          })));
1054      }
1055      addListener(CompletableFuture.allOf(
1056        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1057          if (err2 != null) {
1058            future.completeExceptionally(err2);
1059          } else {
1060            future.complete(ret);
1061          }
1062        });
1063    });
1064    return future;
1065  }
1066
1067  @Override
1068  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1069    return compact(tableName, null, false, compactType);
1070  }
1071
1072  @Override
1073  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1074    CompactType compactType) {
1075    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1076      + "If you don't specify a columnFamily, use compact(TableName) instead");
1077    return compact(tableName, columnFamily, false, compactType);
1078  }
1079
1080  @Override
1081  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1082    return compactRegion(regionName, null, false);
1083  }
1084
1085  @Override
1086  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1087    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1088      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1089    return compactRegion(regionName, columnFamily, false);
1090  }
1091
1092  @Override
1093  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1094    return compact(tableName, null, true, compactType);
1095  }
1096
1097  @Override
1098  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1099    CompactType compactType) {
1100    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1101      + "If you don't specify a columnFamily, use compact(TableName) instead");
1102    return compact(tableName, columnFamily, true, compactType);
1103  }
1104
1105  @Override
1106  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1107    return compactRegion(regionName, null, true);
1108  }
1109
1110  @Override
1111  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1112    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1113      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1114    return compactRegion(regionName, columnFamily, true);
1115  }
1116
1117  @Override
1118  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1119    return compactRegionServer(sn, false);
1120  }
1121
1122  @Override
1123  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1124    return compactRegionServer(sn, true);
1125  }
1126
1127  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1128    CompletableFuture<Void> future = new CompletableFuture<>();
1129    addListener(getRegions(sn), (hRegionInfos, err) -> {
1130      if (err != null) {
1131        future.completeExceptionally(err);
1132        return;
1133      }
1134      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1135      if (hRegionInfos != null) {
1136        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1137      }
1138      addListener(CompletableFuture.allOf(
1139        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1140          if (err2 != null) {
1141            future.completeExceptionally(err2);
1142          } else {
1143            future.complete(ret);
1144          }
1145        });
1146    });
1147    return future;
1148  }
1149
1150  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1151    boolean major) {
1152    CompletableFuture<Void> future = new CompletableFuture<>();
1153    addListener(getRegionLocation(regionName), (location, err) -> {
1154      if (err != null) {
1155        future.completeExceptionally(err);
1156        return;
1157      }
1158      ServerName serverName = location.getServerName();
1159      if (serverName == null) {
1160        future
1161          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1162        return;
1163      }
1164      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1165        (ret, err2) -> {
1166          if (err2 != null) {
1167            future.completeExceptionally(err2);
1168          } else {
1169            future.complete(ret);
1170          }
1171        });
1172    });
1173    return future;
1174  }
1175
1176  /**
1177   * List all region locations for the specific table.
1178   */
1179  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1180    if (TableName.META_TABLE_NAME.equals(tableName)) {
1181      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1182      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1183        if (err != null) {
1184          future.completeExceptionally(err);
1185        } else if (
1186          metaRegions == null || metaRegions.isEmpty()
1187            || metaRegions.getDefaultRegionLocation() == null
1188        ) {
1189          future.completeExceptionally(new IOException("meta region does not found"));
1190        } else {
1191          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1192        }
1193      });
1194      return future;
1195    } else {
1196      // For non-meta table, we fetch all locations by scanning hbase:meta table
1197      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1198    }
1199  }
1200
1201  /**
1202   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1203   */
1204  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1205    CompactType compactType) {
1206    CompletableFuture<Void> future = new CompletableFuture<>();
1207
1208    switch (compactType) {
1209      case MOB:
1210        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1211          if (err != null) {
1212            future.completeExceptionally(err);
1213            return;
1214          }
1215          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1216          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1217            if (err2 != null) {
1218              future.completeExceptionally(err2);
1219            } else {
1220              future.complete(ret);
1221            }
1222          });
1223        });
1224        break;
1225      case NORMAL:
1226        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1227          if (err != null) {
1228            future.completeExceptionally(err);
1229            return;
1230          }
1231          if (locations == null || locations.isEmpty()) {
1232            future.completeExceptionally(new TableNotFoundException(tableName));
1233          }
1234          CompletableFuture<?>[] compactFutures =
1235            locations.stream().filter(l -> l.getRegion() != null)
1236              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1237              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1238              .toArray(CompletableFuture<?>[]::new);
1239          // future complete unless all of the compact futures are completed.
1240          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1241            if (err2 != null) {
1242              future.completeExceptionally(err2);
1243            } else {
1244              future.complete(ret);
1245            }
1246          });
1247        });
1248        break;
1249      default:
1250        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1251    }
1252    return future;
1253  }
1254
1255  /**
1256   * Compact the region at specific region server.
1257   */
1258  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1259    final boolean major, byte[] columnFamily) {
1260    return this.<Void> newAdminCaller().serverName(sn)
1261      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1262        Void> adminCall(controller, stub,
1263          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1264          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1265      .call();
1266  }
1267
1268  private byte[] toEncodeRegionName(byte[] regionName) {
1269    return RegionInfo.isEncodedRegionName(regionName)
1270      ? regionName
1271      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1272  }
1273
1274  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1275    CompletableFuture<TableName> result) {
1276    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1277      if (err != null) {
1278        result.completeExceptionally(err);
1279        return;
1280      }
1281      RegionInfo regionInfo = location.getRegion();
1282      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1283        result.completeExceptionally(
1284          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1285        return;
1286      }
1287      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1288        if (!tableName.get().equals(regionInfo.getTable())) {
1289          // tables of this two region should be same.
1290          result.completeExceptionally(
1291            new IllegalArgumentException("Cannot merge regions from two different tables "
1292              + tableName.get() + " and " + regionInfo.getTable()));
1293        } else {
1294          result.complete(tableName.get());
1295        }
1296      }
1297    });
1298  }
1299
1300  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1301    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1302    CompletableFuture<TableName> future = new CompletableFuture<>();
1303    for (byte[] encodedRegionName : encodedRegionNames) {
1304      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1305    }
1306    return future;
1307  }
1308
1309  @Override
1310  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1311    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1312  }
1313
1314  @Override
1315  public CompletableFuture<Boolean> isMergeEnabled() {
1316    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1317  }
1318
1319  @Override
1320  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1321    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1322  }
1323
1324  @Override
1325  public CompletableFuture<Boolean> isSplitEnabled() {
1326    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1327  }
1328
1329  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1330    MasterSwitchType switchType) {
1331    SetSplitOrMergeEnabledRequest request =
1332      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1333    return this.<Boolean> newMasterCaller()
1334      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1335        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1336          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1337          (resp) -> resp.getPrevValueList().get(0)))
1338      .call();
1339  }
1340
1341  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1342    IsSplitOrMergeEnabledRequest request =
1343      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1344    return this.<Boolean> newMasterCaller()
1345      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1346        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1347          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1348      .call();
1349  }
1350
1351  @Override
1352  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1353    if (nameOfRegionsToMerge.size() < 2) {
1354      return failedFuture(new IllegalArgumentException(
1355        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1356    }
1357    CompletableFuture<Void> future = new CompletableFuture<>();
1358    byte[][] encodedNameOfRegionsToMerge =
1359      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1360
1361    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1362      if (err != null) {
1363        future.completeExceptionally(err);
1364        return;
1365      }
1366
1367      final MergeTableRegionsRequest request;
1368      try {
1369        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1370          forcible, ng.getNonceGroup(), ng.newNonce());
1371      } catch (DeserializationException e) {
1372        future.completeExceptionally(e);
1373        return;
1374      }
1375
1376      addListener(
1377        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1378          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1379        (ret, err2) -> {
1380          if (err2 != null) {
1381            future.completeExceptionally(err2);
1382          } else {
1383            future.complete(ret);
1384          }
1385        });
1386    });
1387    return future;
1388  }
1389
1390  @Override
1391  public CompletableFuture<Void> split(TableName tableName) {
1392    CompletableFuture<Void> future = new CompletableFuture<>();
1393    addListener(tableExists(tableName), (exist, error) -> {
1394      if (error != null) {
1395        future.completeExceptionally(error);
1396        return;
1397      }
1398      if (!exist) {
1399        future.completeExceptionally(new TableNotFoundException(tableName));
1400        return;
1401      }
1402      addListener(metaTable
1403        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1404          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1405            ClientMetaTableAccessor.QueryType.REGION))
1406          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1407            ClientMetaTableAccessor.QueryType.REGION))),
1408        (results, err2) -> {
1409          if (err2 != null) {
1410            future.completeExceptionally(err2);
1411            return;
1412          }
1413          if (results != null && !results.isEmpty()) {
1414            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1415            for (Result r : results) {
1416              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1417                continue;
1418              }
1419              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1420              if (rl != null) {
1421                for (HRegionLocation h : rl.getRegionLocations()) {
1422                  if (h != null && h.getServerName() != null) {
1423                    RegionInfo hri = h.getRegion();
1424                    if (
1425                      hri == null || hri.isSplitParent()
1426                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1427                    ) {
1428                      continue;
1429                    }
1430                    splitFutures.add(split(hri, null));
1431                  }
1432                }
1433              }
1434            }
1435            addListener(
1436              CompletableFuture
1437                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1438              (ret, exception) -> {
1439                if (exception != null) {
1440                  future.completeExceptionally(exception);
1441                  return;
1442                }
1443                future.complete(ret);
1444              });
1445          } else {
1446            future.complete(null);
1447          }
1448        });
1449    });
1450    return future;
1451  }
1452
1453  @Override
1454  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1455    CompletableFuture<Void> result = new CompletableFuture<>();
1456    if (splitPoint == null) {
1457      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1458    }
1459    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1460      (loc, err) -> {
1461        if (err != null) {
1462          result.completeExceptionally(err);
1463        } else if (loc == null || loc.getRegion() == null) {
1464          result.completeExceptionally(new IllegalArgumentException(
1465            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1466        } else {
1467          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1468            if (err2 != null) {
1469              result.completeExceptionally(err2);
1470            } else {
1471              result.complete(ret);
1472            }
1473
1474          });
1475        }
1476      });
1477    return result;
1478  }
1479
1480  @Override
1481  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1482    CompletableFuture<Void> future = new CompletableFuture<>();
1483    addListener(getRegionLocation(regionName), (location, err) -> {
1484      if (err != null) {
1485        future.completeExceptionally(err);
1486        return;
1487      }
1488      RegionInfo regionInfo = location.getRegion();
1489      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1490        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1491          + "Replicas are auto-split when their primary is split."));
1492        return;
1493      }
1494      ServerName serverName = location.getServerName();
1495      if (serverName == null) {
1496        future
1497          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1498        return;
1499      }
1500      addListener(split(regionInfo, null), (ret, err2) -> {
1501        if (err2 != null) {
1502          future.completeExceptionally(err2);
1503        } else {
1504          future.complete(ret);
1505        }
1506      });
1507    });
1508    return future;
1509  }
1510
1511  @Override
1512  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1513    Preconditions.checkNotNull(splitPoint,
1514      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1515    CompletableFuture<Void> future = new CompletableFuture<>();
1516    addListener(getRegionLocation(regionName), (location, err) -> {
1517      if (err != null) {
1518        future.completeExceptionally(err);
1519        return;
1520      }
1521      RegionInfo regionInfo = location.getRegion();
1522      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1523        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1524          + "Replicas are auto-split when their primary is split."));
1525        return;
1526      }
1527      ServerName serverName = location.getServerName();
1528      if (serverName == null) {
1529        future
1530          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1531        return;
1532      }
1533      if (
1534        regionInfo.getStartKey() != null
1535          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1536      ) {
1537        future.completeExceptionally(
1538          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1539        return;
1540      }
1541      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1542        if (err2 != null) {
1543          future.completeExceptionally(err2);
1544        } else {
1545          future.complete(ret);
1546        }
1547      });
1548    });
1549    return future;
1550  }
1551
1552  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1553    CompletableFuture<Void> future = new CompletableFuture<>();
1554    TableName tableName = hri.getTable();
1555    final SplitTableRegionRequest request;
1556    try {
1557      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1558        ng.newNonce());
1559    } catch (DeserializationException e) {
1560      future.completeExceptionally(e);
1561      return future;
1562    }
1563
1564    addListener(
1565      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1566        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1567      (ret, err2) -> {
1568        if (err2 != null) {
1569          future.completeExceptionally(err2);
1570        } else {
1571          future.complete(ret);
1572        }
1573      });
1574    return future;
1575  }
1576
1577  @Override
1578  public CompletableFuture<Void> assign(byte[] regionName) {
1579    CompletableFuture<Void> future = new CompletableFuture<>();
1580    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1581      if (err != null) {
1582        future.completeExceptionally(err);
1583        return;
1584      }
1585      addListener(
1586        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1587          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1588            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1589            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1590          .call(),
1591        (ret, err2) -> {
1592          if (err2 != null) {
1593            future.completeExceptionally(err2);
1594          } else {
1595            future.complete(ret);
1596          }
1597        });
1598    });
1599    return future;
1600  }
1601
1602  @Override
1603  public CompletableFuture<Void> unassign(byte[] regionName) {
1604    CompletableFuture<Void> future = new CompletableFuture<>();
1605    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1606      if (err != null) {
1607        future.completeExceptionally(err);
1608        return;
1609      }
1610      addListener(
1611        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1612          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1613            Void> call(controller, stub,
1614              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1615              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1616          .call(),
1617        (ret, err2) -> {
1618          if (err2 != null) {
1619            future.completeExceptionally(err2);
1620          } else {
1621            future.complete(ret);
1622          }
1623        });
1624    });
1625    return future;
1626  }
1627
1628  @Override
1629  public CompletableFuture<Void> offline(byte[] regionName) {
1630    CompletableFuture<Void> future = new CompletableFuture<>();
1631    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1632      if (err != null) {
1633        future.completeExceptionally(err);
1634        return;
1635      }
1636      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1637        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1638          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1639          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1640        .call(), (ret, err2) -> {
1641          if (err2 != null) {
1642            future.completeExceptionally(err2);
1643          } else {
1644            future.complete(ret);
1645          }
1646        });
1647    });
1648    return future;
1649  }
1650
1651  @Override
1652  public CompletableFuture<Void> move(byte[] regionName) {
1653    CompletableFuture<Void> future = new CompletableFuture<>();
1654    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1655      if (err != null) {
1656        future.completeExceptionally(err);
1657        return;
1658      }
1659      addListener(
1660        moveRegion(regionInfo,
1661          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1662        (ret, err2) -> {
1663          if (err2 != null) {
1664            future.completeExceptionally(err2);
1665          } else {
1666            future.complete(ret);
1667          }
1668        });
1669    });
1670    return future;
1671  }
1672
1673  @Override
1674  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1675    Preconditions.checkNotNull(destServerName,
1676      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1677    CompletableFuture<Void> future = new CompletableFuture<>();
1678    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1679      if (err != null) {
1680        future.completeExceptionally(err);
1681        return;
1682      }
1683      addListener(
1684        moveRegion(regionInfo, RequestConverter
1685          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1686        (ret, err2) -> {
1687          if (err2 != null) {
1688            future.completeExceptionally(err2);
1689          } else {
1690            future.complete(ret);
1691          }
1692        });
1693    });
1694    return future;
1695  }
1696
1697  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1698    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1699      .action(
1700        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1701          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1702      .call();
1703  }
1704
1705  @Override
1706  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1707    return this.<Void> newMasterCaller()
1708      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1709        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1710        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1711      .call();
1712  }
1713
1714  @Override
1715  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1716    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1717    Scan scan = QuotaTableUtil.makeScan(filter);
1718    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1719      new AdvancedScanResultConsumer() {
1720        List<QuotaSettings> settings = new ArrayList<>();
1721
1722        @Override
1723        public void onNext(Result[] results, ScanController controller) {
1724          for (Result result : results) {
1725            try {
1726              QuotaTableUtil.parseResultToCollection(result, settings);
1727            } catch (IOException e) {
1728              controller.terminate();
1729              future.completeExceptionally(e);
1730            }
1731          }
1732        }
1733
1734        @Override
1735        public void onError(Throwable error) {
1736          future.completeExceptionally(error);
1737        }
1738
1739        @Override
1740        public void onComplete() {
1741          future.complete(settings);
1742        }
1743      });
1744    return future;
1745  }
1746
1747  @Override
1748  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1749    boolean enabled) {
1750    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1751      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1752      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1753      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1754  }
1755
1756  @Override
1757  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1758    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1759      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1760      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1761      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1762  }
1763
1764  @Override
1765  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1766    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1767      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1768      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1769      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1770  }
1771
1772  @Override
1773  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1774    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1775      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1776      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1777      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1778  }
1779
1780  @Override
1781  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1782    return this.<ReplicationPeerConfig> newMasterCaller()
1783      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1784        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1785          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1786          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1787          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1788      .call();
1789  }
1790
1791  @Override
1792  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1793    ReplicationPeerConfig peerConfig) {
1794    return this.<UpdateReplicationPeerConfigRequest,
1795      UpdateReplicationPeerConfigResponse> procedureCall(
1796        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1797        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1798        (resp) -> resp.getProcId(),
1799        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1800  }
1801
1802  @Override
1803  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1804    SyncReplicationState clusterState) {
1805    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1806      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1807        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1808          clusterState),
1809        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1810        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1811          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1812  }
1813
1814  @Override
1815  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1816    Map<TableName, List<String>> tableCfs) {
1817    if (tableCfs == null) {
1818      return failedFuture(new ReplicationException("tableCfs is null"));
1819    }
1820
1821    CompletableFuture<Void> future = new CompletableFuture<Void>();
1822    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1823      if (!completeExceptionally(future, error)) {
1824        ReplicationPeerConfig newPeerConfig =
1825          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1826        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1827          if (!completeExceptionally(future, error)) {
1828            future.complete(result);
1829          }
1830        });
1831      }
1832    });
1833    return future;
1834  }
1835
1836  @Override
1837  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1838    Map<TableName, List<String>> tableCfs) {
1839    if (tableCfs == null) {
1840      return failedFuture(new ReplicationException("tableCfs is null"));
1841    }
1842
1843    CompletableFuture<Void> future = new CompletableFuture<Void>();
1844    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1845      if (!completeExceptionally(future, error)) {
1846        ReplicationPeerConfig newPeerConfig = null;
1847        try {
1848          newPeerConfig = ReplicationPeerConfigUtil
1849            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1850        } catch (ReplicationException e) {
1851          future.completeExceptionally(e);
1852          return;
1853        }
1854        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1855          if (!completeExceptionally(future, error)) {
1856            future.complete(result);
1857          }
1858        });
1859      }
1860    });
1861    return future;
1862  }
1863
1864  @Override
1865  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1866    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1867  }
1868
1869  @Override
1870  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1871    Preconditions.checkNotNull(pattern,
1872      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1873    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1874  }
1875
1876  private CompletableFuture<List<ReplicationPeerDescription>>
1877    listReplicationPeers(ListReplicationPeersRequest request) {
1878    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1879      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1880        List<ReplicationPeerDescription>> call(controller, stub, request,
1881          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1882          (resp) -> resp.getPeerDescList().stream()
1883            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1884            .collect(Collectors.toList())))
1885      .call();
1886  }
1887
1888  @Override
1889  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1890    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1891    addListener(listTableDescriptors(), (tables, error) -> {
1892      if (!completeExceptionally(future, error)) {
1893        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1894        tables.forEach(table -> {
1895          Map<String, Integer> cfs = new HashMap<>();
1896          Stream.of(table.getColumnFamilies())
1897            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1898            .forEach(column -> {
1899              cfs.put(column.getNameAsString(), column.getScope());
1900            });
1901          if (!cfs.isEmpty()) {
1902            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1903          }
1904        });
1905        future.complete(replicatedTableCFs);
1906      }
1907    });
1908    return future;
1909  }
1910
1911  @Override
1912  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1913    SnapshotProtos.SnapshotDescription snapshot =
1914      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1915    try {
1916      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1917    } catch (IllegalArgumentException e) {
1918      return failedFuture(e);
1919    }
1920    CompletableFuture<Void> future = new CompletableFuture<>();
1921    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
1922      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
1923    addListener(this.<SnapshotResponse> newMasterCaller()
1924      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
1925        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
1926      .call(), (resp, err) -> {
1927        if (err != null) {
1928          future.completeExceptionally(err);
1929          return;
1930        }
1931        waitSnapshotFinish(snapshotDesc, future, resp);
1932      });
1933    return future;
1934  }
1935
1936  // This is for keeping compatibility with old implementation.
1937  // If there is a procId field in the response, then the snapshot will be operated with a
1938  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
1939  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
1940    SnapshotResponse resp) {
1941    if (resp.hasProcId()) {
1942      getProcedureResult(resp.getProcId(), future, 0);
1943      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
1944    } else {
1945      long expectedTimeout = resp.getExpectedTimeout();
1946      TimerTask pollingTask = new TimerTask() {
1947        int tries = 0;
1948        long startTime = EnvironmentEdgeManager.currentTime();
1949        long endTime = startTime + expectedTimeout;
1950        long maxPauseTime = expectedTimeout / maxAttempts;
1951
1952        @Override
1953        public void run(Timeout timeout) throws Exception {
1954          if (EnvironmentEdgeManager.currentTime() < endTime) {
1955            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
1956              if (err2 != null) {
1957                future.completeExceptionally(err2);
1958              } else if (done) {
1959                future.complete(null);
1960              } else {
1961                // retry again after pauseTime.
1962                long pauseTime =
1963                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1964                pauseTime = Math.min(pauseTime, maxPauseTime);
1965                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
1966              }
1967            });
1968          } else {
1969            future
1970              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
1971                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
1972          }
1973        }
1974      };
1975      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1976    }
1977  }
1978
1979  @Override
1980  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1981    return this.<Boolean> newMasterCaller()
1982      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
1983        Boolean> call(controller, stub,
1984          IsSnapshotDoneRequest.newBuilder()
1985            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
1986          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
1987      .call();
1988  }
1989
1990  @Override
1991  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1992    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1993      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1994      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1995    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1996  }
1997
1998  @Override
1999  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2000    boolean restoreAcl) {
2001    CompletableFuture<Void> future = new CompletableFuture<>();
2002    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2003      if (err != null) {
2004        future.completeExceptionally(err);
2005        return;
2006      }
2007      TableName tableName = null;
2008      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2009        for (SnapshotDescription snap : snapshotDescriptions) {
2010          if (snap.getName().equals(snapshotName)) {
2011            tableName = snap.getTableName();
2012            break;
2013          }
2014        }
2015      }
2016      if (tableName == null) {
2017        future.completeExceptionally(new RestoreSnapshotException(
2018          "Unable to find the table name for snapshot=" + snapshotName));
2019        return;
2020      }
2021      final TableName finalTableName = tableName;
2022      addListener(tableExists(finalTableName), (exists, err2) -> {
2023        if (err2 != null) {
2024          future.completeExceptionally(err2);
2025        } else if (!exists) {
2026          // if table does not exist, then just clone snapshot into new table.
2027          completeConditionalOnFuture(future,
2028            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2029        } else {
2030          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2031            if (err4 != null) {
2032              future.completeExceptionally(err4);
2033            } else if (!disabled) {
2034              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2035            } else {
2036              completeConditionalOnFuture(future,
2037                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2038            }
2039          });
2040        }
2041      });
2042    });
2043    return future;
2044  }
2045
2046  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2047    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2048    if (takeFailSafeSnapshot) {
2049      CompletableFuture<Void> future = new CompletableFuture<>();
2050      // Step.1 Take a snapshot of the current state
2051      String failSafeSnapshotSnapshotNameFormat =
2052        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2053          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2054      final String failSafeSnapshotSnapshotName =
2055        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2056          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2057          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2058      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2059      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2060        if (err != null) {
2061          future.completeExceptionally(err);
2062        } else {
2063          // Step.2 Restore snapshot
2064          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2065            (void2, err2) -> {
2066              if (err2 != null) {
2067                // Step.3.a Something went wrong during the restore and try to rollback.
2068                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2069                  restoreAcl, null), (void3, err3) -> {
2070                    if (err3 != null) {
2071                      future.completeExceptionally(err3);
2072                    } else {
2073                      String msg =
2074                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
2075                          + failSafeSnapshotSnapshotName + " succeeded.";
2076                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2077                    }
2078                  });
2079              } else {
2080                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2081                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2082                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2083                  if (err3 != null) {
2084                    LOG.error(
2085                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2086                      err3);
2087                  }
2088                  future.complete(ret3);
2089                });
2090              }
2091            });
2092        }
2093      });
2094      return future;
2095    } else {
2096      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2097    }
2098  }
2099
2100  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2101    CompletableFuture<T> parentFuture) {
2102    addListener(parentFuture, (res, err) -> {
2103      if (err != null) {
2104        dependentFuture.completeExceptionally(err);
2105      } else {
2106        dependentFuture.complete(res);
2107      }
2108    });
2109  }
2110
2111  @Override
2112  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2113    boolean restoreAcl, String customSFT) {
2114    CompletableFuture<Void> future = new CompletableFuture<>();
2115    addListener(tableExists(tableName), (exists, err) -> {
2116      if (err != null) {
2117        future.completeExceptionally(err);
2118      } else if (exists) {
2119        future.completeExceptionally(new TableExistsException(tableName));
2120      } else {
2121        completeConditionalOnFuture(future,
2122          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2123      }
2124    });
2125    return future;
2126  }
2127
2128  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2129    boolean restoreAcl, String customSFT) {
2130    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2131      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2132    try {
2133      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2134    } catch (IllegalArgumentException e) {
2135      return failedFuture(e);
2136    }
2137    RestoreSnapshotRequest.Builder builder =
2138      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2139        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2140    if (customSFT != null) {
2141      builder.setCustomSFT(customSFT);
2142    }
2143    return waitProcedureResult(this.<Long> newMasterCaller()
2144      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2145        Long> call(controller, stub, builder.build(),
2146          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2147      .call());
2148  }
2149
2150  @Override
2151  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2152    return getCompletedSnapshots(null);
2153  }
2154
2155  @Override
2156  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2157    Preconditions.checkNotNull(pattern,
2158      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2159    return getCompletedSnapshots(pattern);
2160  }
2161
2162  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2163    return this.<List<SnapshotDescription>> newMasterCaller()
2164      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2165        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2166          GetCompletedSnapshotsRequest.newBuilder().build(),
2167          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2168          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2169      .call();
2170  }
2171
2172  @Override
2173  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2174    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2175      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2176    return getCompletedSnapshots(tableNamePattern, null);
2177  }
2178
2179  @Override
2180  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2181    Pattern snapshotNamePattern) {
2182    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2183      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2184    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2185      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2186    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2187  }
2188
2189  private CompletableFuture<List<SnapshotDescription>>
2190    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2191    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2192    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2193      if (err != null) {
2194        future.completeExceptionally(err);
2195        return;
2196      }
2197      if (tableNames == null || tableNames.size() <= 0) {
2198        future.complete(Collections.emptyList());
2199        return;
2200      }
2201      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2202        if (err2 != null) {
2203          future.completeExceptionally(err2);
2204          return;
2205        }
2206        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2207          future.complete(Collections.emptyList());
2208          return;
2209        }
2210        future.complete(snapshotDescList.stream()
2211          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2212          .collect(Collectors.toList()));
2213      });
2214    });
2215    return future;
2216  }
2217
2218  @Override
2219  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2220    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2221  }
2222
2223  @Override
2224  public CompletableFuture<Void> deleteSnapshots() {
2225    return internalDeleteSnapshots(null, null);
2226  }
2227
2228  @Override
2229  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2230    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2231      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2232    return internalDeleteSnapshots(null, snapshotNamePattern);
2233  }
2234
2235  @Override
2236  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2237    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2238      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2239    return internalDeleteSnapshots(tableNamePattern, null);
2240  }
2241
2242  @Override
2243  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2244    Pattern snapshotNamePattern) {
2245    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2246      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2247    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2248      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2249    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2250  }
2251
2252  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2253    Pattern snapshotNamePattern) {
2254    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2255    if (tableNamePattern == null) {
2256      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2257    } else {
2258      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2259    }
2260    CompletableFuture<Void> future = new CompletableFuture<>();
2261    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2262      if (err != null) {
2263        future.completeExceptionally(err);
2264        return;
2265      }
2266      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2267        future.complete(null);
2268        return;
2269      }
2270      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2271        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2272          if (e != null) {
2273            future.completeExceptionally(e);
2274          } else {
2275            future.complete(v);
2276          }
2277        });
2278    });
2279    return future;
2280  }
2281
2282  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2283    return this.<Void> newMasterCaller()
2284      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2285        controller, stub,
2286        DeleteSnapshotRequest.newBuilder()
2287          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2288        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2289      .call();
2290  }
2291
2292  @Override
2293  public CompletableFuture<Void> execProcedure(String signature, String instance,
2294    Map<String, String> props) {
2295    CompletableFuture<Void> future = new CompletableFuture<>();
2296    ProcedureDescription procDesc =
2297      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2298    addListener(this.<Long> newMasterCaller()
2299      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2300        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2301        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2302      .call(), (expectedTimeout, err) -> {
2303        if (err != null) {
2304          future.completeExceptionally(err);
2305          return;
2306        }
2307        TimerTask pollingTask = new TimerTask() {
2308          int tries = 0;
2309          long startTime = EnvironmentEdgeManager.currentTime();
2310          long endTime = startTime + expectedTimeout;
2311          long maxPauseTime = expectedTimeout / maxAttempts;
2312
2313          @Override
2314          public void run(Timeout timeout) throws Exception {
2315            if (EnvironmentEdgeManager.currentTime() < endTime) {
2316              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2317                if (err2 != null) {
2318                  future.completeExceptionally(err2);
2319                  return;
2320                }
2321                if (done) {
2322                  future.complete(null);
2323                } else {
2324                  // retry again after pauseTime.
2325                  long pauseTime =
2326                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2327                  pauseTime = Math.min(pauseTime, maxPauseTime);
2328                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2329                    TimeUnit.MICROSECONDS);
2330                }
2331              });
2332            } else {
2333              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2334                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2335            }
2336          }
2337        };
2338        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2339        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2340      });
2341    return future;
2342  }
2343
2344  @Override
2345  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2346    Map<String, String> props) {
2347    ProcedureDescription proDesc =
2348      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2349    return this.<byte[]> newMasterCaller()
2350      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2351        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2352        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2353        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2354      .call();
2355  }
2356
2357  @Override
2358  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2359    Map<String, String> props) {
2360    ProcedureDescription proDesc =
2361      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2362    return this.<Boolean> newMasterCaller()
2363      .action(
2364        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2365          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2366          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2367      .call();
2368  }
2369
2370  @Override
2371  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2372    return this.<Boolean> newMasterCaller().action(
2373      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2374        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2375        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2376      .call();
2377  }
2378
2379  @Override
2380  public CompletableFuture<String> getProcedures() {
2381    return this.<String> newMasterCaller()
2382      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2383        controller, stub, GetProceduresRequest.newBuilder().build(),
2384        (s, c, req, done) -> s.getProcedures(c, req, done),
2385        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2386      .call();
2387  }
2388
2389  @Override
2390  public CompletableFuture<String> getLocks() {
2391    return this.<String> newMasterCaller()
2392      .action(
2393        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2394          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2395          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2396      .call();
2397  }
2398
2399  @Override
2400  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2401    boolean offload) {
2402    return this.<Void> newMasterCaller()
2403      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2404        DecommissionRegionServersResponse, Void> call(controller, stub,
2405          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2406          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2407      .call();
2408  }
2409
2410  @Override
2411  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2412    return this.<List<ServerName>> newMasterCaller()
2413      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2414        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2415          ListDecommissionedRegionServersRequest.newBuilder().build(),
2416          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2417          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2418            .collect(Collectors.toList())))
2419      .call();
2420  }
2421
2422  @Override
2423  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2424    List<byte[]> encodedRegionNames) {
2425    return this.<Void> newMasterCaller()
2426      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2427        RecommissionRegionServerResponse, Void> call(controller, stub,
2428          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2429          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2430      .call();
2431  }
2432
2433  /**
2434   * Get the region location for the passed region name. The region name may be a full region name
2435   * or encoded region name. If the region does not found, then it'll throw an
2436   * UnknownRegionException wrapped by a {@link CompletableFuture}
2437   * @param regionNameOrEncodedRegionName region name or encoded region name
2438   * @return region location, wrapped by a {@link CompletableFuture}
2439   */
2440  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2441    if (regionNameOrEncodedRegionName == null) {
2442      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2443    }
2444
2445    CompletableFuture<Optional<HRegionLocation>> future;
2446    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2447      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2448      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2449        // old format encodedName, should be meta region
2450        future = connection.registry.getMetaRegionLocations()
2451          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2452            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2453      } else {
2454        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2455          regionNameOrEncodedRegionName);
2456      }
2457    } else {
2458      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2459      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2460      RegionInfo regionInfo;
2461      try {
2462        regionInfo =
2463          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2464      } catch (IOException ioe) {
2465        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2466      }
2467
2468      if (regionInfo.isMetaRegion()) {
2469        future = connection.registry.getMetaRegionLocations()
2470          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2471            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2472            .findFirst());
2473      } else {
2474        future =
2475          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2476      }
2477    }
2478
2479    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2480    addListener(future, (location, err) -> {
2481      if (err != null) {
2482        returnedFuture.completeExceptionally(err);
2483        return;
2484      }
2485      if (!location.isPresent() || location.get().getRegion() == null) {
2486        returnedFuture.completeExceptionally(
2487          new UnknownRegionException("Invalid region name or encoded region name: "
2488            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2489      } else {
2490        returnedFuture.complete(location.get());
2491      }
2492    });
2493    return returnedFuture;
2494  }
2495
2496  /**
2497   * Get the region info for the passed region name. The region name may be a full region name or
2498   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2499   * wrapped by a {@link CompletableFuture}
2500   * @return region info, wrapped by a {@link CompletableFuture}
2501   */
2502  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2503    if (regionNameOrEncodedRegionName == null) {
2504      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2505    }
2506
2507    if (
2508      Bytes.equals(regionNameOrEncodedRegionName,
2509        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2510        || Bytes.equals(regionNameOrEncodedRegionName,
2511          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2512    ) {
2513      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2514    }
2515
2516    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2517    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2518      if (err != null) {
2519        future.completeExceptionally(err);
2520      } else {
2521        future.complete(location.getRegion());
2522      }
2523    });
2524    return future;
2525  }
2526
2527  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2528    if (numRegions < 3) {
2529      throw new IllegalArgumentException("Must create at least three regions");
2530    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2531      throw new IllegalArgumentException("Start key must be smaller than end key");
2532    }
2533    if (numRegions == 3) {
2534      return new byte[][] { startKey, endKey };
2535    }
2536    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2537    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2538      throw new IllegalArgumentException("Unable to split key range into enough regions");
2539    }
2540    return splitKeys;
2541  }
2542
2543  private void verifySplitKeys(byte[][] splitKeys) {
2544    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2545    // Verify there are no duplicate split keys
2546    byte[] lastKey = null;
2547    for (byte[] splitKey : splitKeys) {
2548      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2549        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2550      }
2551      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2552        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2553          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2554      }
2555      lastKey = splitKey;
2556    }
2557  }
2558
2559  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2560
2561    abstract void onFinished();
2562
2563    abstract void onError(Throwable error);
2564
2565    @Override
2566    public void accept(Void v, Throwable error) {
2567      if (error != null) {
2568        onError(error);
2569        return;
2570      }
2571      onFinished();
2572    }
2573  }
2574
2575  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2576    protected final TableName tableName;
2577
2578    TableProcedureBiConsumer(TableName tableName) {
2579      this.tableName = tableName;
2580    }
2581
2582    abstract String getOperationType();
2583
2584    String getDescription() {
2585      return "Operation: " + getOperationType() + ", " + "Table Name: "
2586        + tableName.getNameWithNamespaceInclAsString();
2587    }
2588
2589    @Override
2590    void onFinished() {
2591      LOG.info(getDescription() + " completed");
2592    }
2593
2594    @Override
2595    void onError(Throwable error) {
2596      LOG.info(getDescription() + " failed with " + error.getMessage());
2597    }
2598  }
2599
2600  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2601    protected final String namespaceName;
2602
2603    NamespaceProcedureBiConsumer(String namespaceName) {
2604      this.namespaceName = namespaceName;
2605    }
2606
2607    abstract String getOperationType();
2608
2609    String getDescription() {
2610      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2611    }
2612
2613    @Override
2614    void onFinished() {
2615      LOG.info(getDescription() + " completed");
2616    }
2617
2618    @Override
2619    void onError(Throwable error) {
2620      LOG.info(getDescription() + " failed with " + error.getMessage());
2621    }
2622  }
2623
2624  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2625
2626    CreateTableProcedureBiConsumer(TableName tableName) {
2627      super(tableName);
2628    }
2629
2630    @Override
2631    String getOperationType() {
2632      return "CREATE";
2633    }
2634  }
2635
2636  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2637
2638    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2639      super(tableName);
2640    }
2641
2642    @Override
2643    String getOperationType() {
2644      return "MODIFY";
2645    }
2646  }
2647
2648  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2649    extends TableProcedureBiConsumer {
2650
2651    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2652      super(tableName);
2653    }
2654
2655    @Override
2656    String getOperationType() {
2657      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2658    }
2659  }
2660
2661  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2662
2663    DeleteTableProcedureBiConsumer(TableName tableName) {
2664      super(tableName);
2665    }
2666
2667    @Override
2668    String getOperationType() {
2669      return "DELETE";
2670    }
2671
2672    @Override
2673    void onFinished() {
2674      connection.getLocator().clearCache(this.tableName);
2675      super.onFinished();
2676    }
2677  }
2678
2679  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2680
2681    TruncateTableProcedureBiConsumer(TableName tableName) {
2682      super(tableName);
2683    }
2684
2685    @Override
2686    String getOperationType() {
2687      return "TRUNCATE";
2688    }
2689  }
2690
2691  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2692
2693    EnableTableProcedureBiConsumer(TableName tableName) {
2694      super(tableName);
2695    }
2696
2697    @Override
2698    String getOperationType() {
2699      return "ENABLE";
2700    }
2701  }
2702
2703  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2704
2705    DisableTableProcedureBiConsumer(TableName tableName) {
2706      super(tableName);
2707    }
2708
2709    @Override
2710    String getOperationType() {
2711      return "DISABLE";
2712    }
2713  }
2714
2715  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2716
2717    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2718      super(tableName);
2719    }
2720
2721    @Override
2722    String getOperationType() {
2723      return "ADD_COLUMN_FAMILY";
2724    }
2725  }
2726
2727  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2728
2729    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2730      super(tableName);
2731    }
2732
2733    @Override
2734    String getOperationType() {
2735      return "DELETE_COLUMN_FAMILY";
2736    }
2737  }
2738
2739  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2740
2741    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2742      super(tableName);
2743    }
2744
2745    @Override
2746    String getOperationType() {
2747      return "MODIFY_COLUMN_FAMILY";
2748    }
2749  }
2750
2751  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2752    extends TableProcedureBiConsumer {
2753
2754    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2755      super(tableName);
2756    }
2757
2758    @Override
2759    String getOperationType() {
2760      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2761    }
2762  }
2763
2764  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2765
2766    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2767      super(namespaceName);
2768    }
2769
2770    @Override
2771    String getOperationType() {
2772      return "CREATE_NAMESPACE";
2773    }
2774  }
2775
2776  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2777
2778    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2779      super(namespaceName);
2780    }
2781
2782    @Override
2783    String getOperationType() {
2784      return "DELETE_NAMESPACE";
2785    }
2786  }
2787
2788  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2789
2790    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2791      super(namespaceName);
2792    }
2793
2794    @Override
2795    String getOperationType() {
2796      return "MODIFY_NAMESPACE";
2797    }
2798  }
2799
2800  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2801
2802    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2803      super(tableName);
2804    }
2805
2806    @Override
2807    String getOperationType() {
2808      return "MERGE_REGIONS";
2809    }
2810  }
2811
2812  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2813
2814    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2815      super(tableName);
2816    }
2817
2818    @Override
2819    String getOperationType() {
2820      return "SPLIT_REGION";
2821    }
2822  }
2823
2824  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2825    SnapshotProcedureBiConsumer(TableName tableName) {
2826      super(tableName);
2827    }
2828
2829    @Override
2830    String getOperationType() {
2831      return "SNAPSHOT";
2832    }
2833  }
2834
2835  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2836    private final String peerId;
2837    private final Supplier<String> getOperation;
2838
2839    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2840      this.peerId = peerId;
2841      this.getOperation = getOperation;
2842    }
2843
2844    String getDescription() {
2845      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2846    }
2847
2848    @Override
2849    void onFinished() {
2850      LOG.info(getDescription() + " completed");
2851    }
2852
2853    @Override
2854    void onError(Throwable error) {
2855      LOG.info(getDescription() + " failed with " + error.getMessage());
2856    }
2857  }
2858
2859  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2860    CompletableFuture<Void> future = new CompletableFuture<>();
2861    addListener(procFuture, (procId, error) -> {
2862      if (error != null) {
2863        future.completeExceptionally(error);
2864        return;
2865      }
2866      getProcedureResult(procId, future, 0);
2867    });
2868    return future;
2869  }
2870
2871  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2872    addListener(
2873      this.<GetProcedureResultResponse> newMasterCaller()
2874        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
2875          GetProcedureResultResponse> call(controller, stub,
2876            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2877            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2878        .call(),
2879      (response, error) -> {
2880        if (error != null) {
2881          LOG.warn("failed to get the procedure result procId={}", procId,
2882            ConnectionUtils.translateException(error));
2883          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2884            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2885          return;
2886        }
2887        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2888          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2889            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2890          return;
2891        }
2892        if (response.hasException()) {
2893          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2894          future.completeExceptionally(ioe);
2895        } else {
2896          future.complete(null);
2897        }
2898      });
2899  }
2900
2901  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2902    CompletableFuture<T> future = new CompletableFuture<>();
2903    future.completeExceptionally(error);
2904    return future;
2905  }
2906
2907  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2908    if (error != null) {
2909      future.completeExceptionally(error);
2910      return true;
2911    }
2912    return false;
2913  }
2914
2915  @Override
2916  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2917    return getClusterMetrics(EnumSet.allOf(Option.class));
2918  }
2919
2920  @Override
2921  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2922    return this.<ClusterMetrics> newMasterCaller()
2923      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
2924        ClusterMetrics> call(controller, stub,
2925          RequestConverter.buildGetClusterStatusRequest(options),
2926          (s, c, req, done) -> s.getClusterStatus(c, req, done),
2927          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
2928      .call();
2929  }
2930
2931  @Override
2932  public CompletableFuture<Void> shutdown() {
2933    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2934      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2935        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2936        resp -> null))
2937      .call();
2938  }
2939
2940  @Override
2941  public CompletableFuture<Void> stopMaster() {
2942    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2943      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2944        controller, stub, StopMasterRequest.newBuilder().build(),
2945        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2946      .call();
2947  }
2948
2949  @Override
2950  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2951    StopServerRequest request = RequestConverter
2952      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2953    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2954      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2955        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2956        resp -> null))
2957      .serverName(serverName).call();
2958  }
2959
2960  @Override
2961  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2962    return this.<Void> newAdminCaller()
2963      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
2964        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2965          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2966      .serverName(serverName).call();
2967  }
2968
2969  @Override
2970  public CompletableFuture<Void> updateConfiguration() {
2971    CompletableFuture<Void> future = new CompletableFuture<Void>();
2972    addListener(
2973      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2974      (status, err) -> {
2975        if (err != null) {
2976          future.completeExceptionally(err);
2977        } else {
2978          List<CompletableFuture<Void>> futures = new ArrayList<>();
2979          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2980          futures.add(updateConfiguration(status.getMasterName()));
2981          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2982          addListener(
2983            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2984            (result, err2) -> {
2985              if (err2 != null) {
2986                future.completeExceptionally(err2);
2987              } else {
2988                future.complete(result);
2989              }
2990            });
2991        }
2992      });
2993    return future;
2994  }
2995
2996  @Override
2997  public CompletableFuture<Void> updateConfiguration(String groupName) {
2998    CompletableFuture<Void> future = new CompletableFuture<Void>();
2999    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3000      if (err != null) {
3001        future.completeExceptionally(err);
3002      } else if (rsGroupInfo == null) {
3003        future.completeExceptionally(
3004          new IllegalArgumentException("Group does not exist: " + groupName));
3005      } else {
3006        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3007          if (err2 != null) {
3008            future.completeExceptionally(err2);
3009          } else {
3010            List<CompletableFuture<Void>> futures = new ArrayList<>();
3011            List<ServerName> groupServers = status.getServersName().stream()
3012              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3013            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3014            addListener(
3015              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3016              (result, err3) -> {
3017                if (err3 != null) {
3018                  future.completeExceptionally(err3);
3019                } else {
3020                  future.complete(result);
3021                }
3022              });
3023          }
3024        });
3025      }
3026    });
3027    return future;
3028  }
3029
3030  @Override
3031  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3032    return this.<Void> newAdminCaller()
3033      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3034        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3035          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3036      .serverName(serverName).call();
3037  }
3038
3039  @Override
3040  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3041    return this.<Void> newAdminCaller()
3042      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3043        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3044          RequestConverter.buildClearCompactionQueuesRequest(queues),
3045          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3046      .serverName(serverName).call();
3047  }
3048
3049  @Override
3050  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3051    return this.<List<SecurityCapability>> newMasterCaller()
3052      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3053        List<SecurityCapability>> call(controller, stub,
3054          SecurityCapabilitiesRequest.newBuilder().build(),
3055          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3056          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3057      .call();
3058  }
3059
3060  @Override
3061  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3062    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3063  }
3064
3065  @Override
3066  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3067    TableName tableName) {
3068    Preconditions.checkNotNull(tableName,
3069      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3070    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3071  }
3072
3073  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3074    ServerName serverName) {
3075    return this.<List<RegionMetrics>> newAdminCaller()
3076      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3077        List<RegionMetrics>> adminCall(controller, stub, request,
3078          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3079          RegionMetricsBuilder::toRegionMetrics))
3080      .serverName(serverName).call();
3081  }
3082
3083  @Override
3084  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3085    return this.<Boolean> newMasterCaller()
3086      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3087        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3088          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3089          resp -> resp.getInMaintenanceMode()))
3090      .call();
3091  }
3092
3093  @Override
3094  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3095    CompactType compactType) {
3096    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3097
3098    switch (compactType) {
3099      case MOB:
3100        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3101          if (err != null) {
3102            future.completeExceptionally(err);
3103            return;
3104          }
3105          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3106
3107          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3108            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3109              GetRegionInfoResponse> adminCall(controller, stub,
3110                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3111                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3112            .call(), (resp2, err2) -> {
3113              if (err2 != null) {
3114                future.completeExceptionally(err2);
3115              } else {
3116                if (resp2.hasCompactionState()) {
3117                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3118                } else {
3119                  future.complete(CompactionState.NONE);
3120                }
3121              }
3122            });
3123        });
3124        break;
3125      case NORMAL:
3126        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3127          if (err != null) {
3128            future.completeExceptionally(err);
3129            return;
3130          }
3131          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3132          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3133          locations.stream().filter(loc -> loc.getServerName() != null)
3134            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3135            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3136              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3137                // If any region compaction state is MAJOR_AND_MINOR
3138                // the table compaction state is MAJOR_AND_MINOR, too.
3139                if (err2 != null) {
3140                  future.completeExceptionally(unwrapCompletionException(err2));
3141                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3142                  future.complete(regionState);
3143                } else {
3144                  regionStates.add(regionState);
3145                }
3146              }));
3147            });
3148          addListener(
3149            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3150            (ret, err3) -> {
3151              // If future not completed, check all regions's compaction state
3152              if (!future.isCompletedExceptionally() && !future.isDone()) {
3153                CompactionState state = CompactionState.NONE;
3154                for (CompactionState regionState : regionStates) {
3155                  switch (regionState) {
3156                    case MAJOR:
3157                      if (state == CompactionState.MINOR) {
3158                        future.complete(CompactionState.MAJOR_AND_MINOR);
3159                      } else {
3160                        state = CompactionState.MAJOR;
3161                      }
3162                      break;
3163                    case MINOR:
3164                      if (state == CompactionState.MAJOR) {
3165                        future.complete(CompactionState.MAJOR_AND_MINOR);
3166                      } else {
3167                        state = CompactionState.MINOR;
3168                      }
3169                      break;
3170                    case NONE:
3171                    default:
3172                  }
3173                }
3174                if (!future.isDone()) {
3175                  future.complete(state);
3176                }
3177              }
3178            });
3179        });
3180        break;
3181      default:
3182        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3183    }
3184
3185    return future;
3186  }
3187
3188  @Override
3189  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3190    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3191    addListener(getRegionLocation(regionName), (location, err) -> {
3192      if (err != null) {
3193        future.completeExceptionally(err);
3194        return;
3195      }
3196      ServerName serverName = location.getServerName();
3197      if (serverName == null) {
3198        future
3199          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3200        return;
3201      }
3202      addListener(
3203        this.<GetRegionInfoResponse> newAdminCaller()
3204          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3205            GetRegionInfoResponse> adminCall(controller, stub,
3206              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3207                true),
3208              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3209          .serverName(serverName).call(),
3210        (resp2, err2) -> {
3211          if (err2 != null) {
3212            future.completeExceptionally(err2);
3213          } else {
3214            if (resp2.hasCompactionState()) {
3215              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3216            } else {
3217              future.complete(CompactionState.NONE);
3218            }
3219          }
3220        });
3221    });
3222    return future;
3223  }
3224
3225  @Override
3226  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3227    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3228      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3229    return this.<Optional<Long>> newMasterCaller()
3230      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3231        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3232          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3233          ProtobufUtil::toOptionalTimestamp))
3234      .call();
3235  }
3236
3237  @Override
3238  public CompletableFuture<Optional<Long>>
3239    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3240    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3241    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3242    addListener(getRegionInfo(regionName), (region, err) -> {
3243      if (err != null) {
3244        future.completeExceptionally(err);
3245        return;
3246      }
3247      MajorCompactionTimestampForRegionRequest.Builder builder =
3248        MajorCompactionTimestampForRegionRequest.newBuilder();
3249      builder.setRegion(
3250        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3251      addListener(this.<Optional<Long>> newMasterCaller()
3252        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3253          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3254            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3255            ProtobufUtil::toOptionalTimestamp))
3256        .call(), (timestamp, err2) -> {
3257          if (err2 != null) {
3258            future.completeExceptionally(err2);
3259          } else {
3260            future.complete(timestamp);
3261          }
3262        });
3263    });
3264    return future;
3265  }
3266
3267  @Override
3268  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3269    List<String> serverNamesList) {
3270    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3271    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3272      if (err != null) {
3273        future.completeExceptionally(err);
3274        return;
3275      }
3276      // Accessed by multiple threads.
3277      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3278      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3279      serverNames.stream().forEach(serverName -> {
3280        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3281          if (err2 != null) {
3282            future.completeExceptionally(unwrapCompletionException(err2));
3283          } else {
3284            serverStates.put(serverName, serverState);
3285          }
3286        }));
3287      });
3288      addListener(
3289        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3290        (ret, err3) -> {
3291          if (!future.isCompletedExceptionally()) {
3292            if (err3 != null) {
3293              future.completeExceptionally(err3);
3294            } else {
3295              future.complete(serverStates);
3296            }
3297          }
3298        });
3299    });
3300    return future;
3301  }
3302
3303  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3304    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3305    if (serverNamesList.isEmpty()) {
3306      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3307        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3308      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3309        if (err != null) {
3310          future.completeExceptionally(err);
3311        } else {
3312          future.complete(clusterMetrics.getServersName());
3313        }
3314      });
3315      return future;
3316    } else {
3317      List<ServerName> serverList = new ArrayList<>();
3318      for (String regionServerName : serverNamesList) {
3319        ServerName serverName = null;
3320        try {
3321          serverName = ServerName.valueOf(regionServerName);
3322        } catch (Exception e) {
3323          future.completeExceptionally(
3324            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3325        }
3326        if (serverName == null) {
3327          future.completeExceptionally(
3328            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3329        } else {
3330          serverList.add(serverName);
3331        }
3332      }
3333      future.complete(serverList);
3334    }
3335    return future;
3336  }
3337
3338  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3339    return this.<Boolean> newAdminCaller().serverName(serverName)
3340      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3341        Boolean> adminCall(controller, stub,
3342          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3343          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3344      .call();
3345  }
3346
3347  @Override
3348  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3349    return this.<Boolean> newMasterCaller()
3350      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3351        Boolean> call(controller, stub,
3352          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3353          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3354          (resp) -> resp.getPrevBalanceValue()))
3355      .call();
3356  }
3357
3358  @Override
3359  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3360    return this.<BalanceResponse> newMasterCaller()
3361      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3362        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3363          (s, c, req, done) -> s.balance(c, req, done),
3364          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3365      .call();
3366  }
3367
3368  @Override
3369  public CompletableFuture<Boolean> isBalancerEnabled() {
3370    return this.<Boolean> newMasterCaller()
3371      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3372        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3373          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3374      .call();
3375  }
3376
3377  @Override
3378  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3379    return this.<Boolean> newMasterCaller()
3380      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3381        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3382          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3383          (resp) -> resp.getPrevNormalizerValue()))
3384      .call();
3385  }
3386
3387  @Override
3388  public CompletableFuture<Boolean> isNormalizerEnabled() {
3389    return this.<Boolean> newMasterCaller()
3390      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3391        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3392          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3393      .call();
3394  }
3395
3396  @Override
3397  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3398    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3399  }
3400
3401  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3402    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3403      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3404  }
3405
3406  @Override
3407  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3408    return this.<Boolean> newMasterCaller()
3409      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3410        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3411          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3412          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3413          (resp) -> resp.getPrevValue()))
3414      .call();
3415  }
3416
3417  @Override
3418  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3419    return this.<Boolean> newMasterCaller()
3420      .action(
3421        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3422          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3423            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3424      .call();
3425  }
3426
3427  @Override
3428  public CompletableFuture<Boolean> runCleanerChore() {
3429    return this.<Boolean> newMasterCaller()
3430      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3431        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3432          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3433          (resp) -> resp.getCleanerChoreRan()))
3434      .call();
3435  }
3436
3437  @Override
3438  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3439    return this.<Boolean> newMasterCaller()
3440      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3441        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3442          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3443      .call();
3444  }
3445
3446  @Override
3447  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3448    return this.<Boolean> newMasterCaller()
3449      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3450        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3451          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3452          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3453      .call();
3454  }
3455
3456  @Override
3457  public CompletableFuture<Integer> runCatalogJanitor() {
3458    return this.<Integer> newMasterCaller()
3459      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3460        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3461          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3462      .call();
3463  }
3464
3465  @Override
3466  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3467    ServiceCaller<S, R> callable) {
3468    MasterCoprocessorRpcChannelImpl channel =
3469      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3470    S stub = stubMaker.apply(channel);
3471    CompletableFuture<R> future = new CompletableFuture<>();
3472    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3473    callable.call(stub, controller, resp -> {
3474      if (controller.failed()) {
3475        future.completeExceptionally(controller.getFailed());
3476      } else {
3477        future.complete(resp);
3478      }
3479    });
3480    return future;
3481  }
3482
3483  @Override
3484  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3485    ServiceCaller<S, R> callable, ServerName serverName) {
3486    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3487      this.<Message> newServerCaller().serverName(serverName));
3488    S stub = stubMaker.apply(channel);
3489    CompletableFuture<R> future = new CompletableFuture<>();
3490    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3491    callable.call(stub, controller, resp -> {
3492      if (controller.failed()) {
3493        future.completeExceptionally(controller.getFailed());
3494      } else {
3495        future.complete(resp);
3496      }
3497    });
3498    return future;
3499  }
3500
3501  @Override
3502  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3503    return this.<List<ServerName>> newMasterCaller()
3504      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3505        List<ServerName>> call(controller, stub,
3506          RequestConverter.buildClearDeadServersRequest(servers),
3507          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3508          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3509      .call();
3510  }
3511
3512  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3513    return this.connection.callerFactory.<T> serverRequest()
3514      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3515      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3516      .pause(pauseNs, TimeUnit.NANOSECONDS)
3517      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3518      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3519  }
3520
3521  @Override
3522  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3523    if (tableName == null) {
3524      return failedFuture(new IllegalArgumentException("Table name is null"));
3525    }
3526    CompletableFuture<Void> future = new CompletableFuture<>();
3527    addListener(tableExists(tableName), (exist, err) -> {
3528      if (err != null) {
3529        future.completeExceptionally(err);
3530        return;
3531      }
3532      if (!exist) {
3533        future.completeExceptionally(new TableNotFoundException(
3534          "Table '" + tableName.getNameAsString() + "' does not exists."));
3535        return;
3536      }
3537      addListener(getTableSplits(tableName), (splits, err1) -> {
3538        if (err1 != null) {
3539          future.completeExceptionally(err1);
3540        } else {
3541          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3542            if (err2 != null) {
3543              future.completeExceptionally(err2);
3544            } else {
3545              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3546                if (err3 != null) {
3547                  future.completeExceptionally(err3);
3548                } else {
3549                  future.complete(result3);
3550                }
3551              });
3552            }
3553          });
3554        }
3555      });
3556    });
3557    return future;
3558  }
3559
3560  @Override
3561  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3562    if (tableName == null) {
3563      return failedFuture(new IllegalArgumentException("Table name is null"));
3564    }
3565    CompletableFuture<Void> future = new CompletableFuture<>();
3566    addListener(tableExists(tableName), (exist, err) -> {
3567      if (err != null) {
3568        future.completeExceptionally(err);
3569        return;
3570      }
3571      if (!exist) {
3572        future.completeExceptionally(new TableNotFoundException(
3573          "Table '" + tableName.getNameAsString() + "' does not exists."));
3574        return;
3575      }
3576      addListener(setTableReplication(tableName, false), (result, err2) -> {
3577        if (err2 != null) {
3578          future.completeExceptionally(err2);
3579        } else {
3580          future.complete(result);
3581        }
3582      });
3583    });
3584    return future;
3585  }
3586
3587  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3588    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3589    addListener(
3590      getRegions(tableName).thenApply(regions -> regions.stream()
3591        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3592      (regions, err2) -> {
3593        if (err2 != null) {
3594          future.completeExceptionally(err2);
3595          return;
3596        }
3597        if (regions.size() == 1) {
3598          future.complete(null);
3599        } else {
3600          byte[][] splits = new byte[regions.size() - 1][];
3601          for (int i = 1; i < regions.size(); i++) {
3602            splits[i - 1] = regions.get(i).getStartKey();
3603          }
3604          future.complete(splits);
3605        }
3606      });
3607    return future;
3608  }
3609
3610  /**
3611   * Connect to peer and check the table descriptor on peer:
3612   * <ol>
3613   * <li>Create the same table on peer when not exist.</li>
3614   * <li>Throw an exception if the table already has replication enabled on any of the column
3615   * families.</li>
3616   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3617   * </ol>
3618   * @param tableName name of the table to sync to the peer
3619   * @param splits    table split keys
3620   */
3621  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3622    byte[][] splits) {
3623    CompletableFuture<Void> future = new CompletableFuture<>();
3624    addListener(listReplicationPeers(), (peers, err) -> {
3625      if (err != null) {
3626        future.completeExceptionally(err);
3627        return;
3628      }
3629      if (peers == null || peers.size() <= 0) {
3630        future.completeExceptionally(
3631          new IllegalArgumentException("Found no peer cluster for replication."));
3632        return;
3633      }
3634      List<CompletableFuture<Void>> futures = new ArrayList<>();
3635      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3636        .forEach(peer -> {
3637          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3638        });
3639      addListener(
3640        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3641        (result, err2) -> {
3642          if (err2 != null) {
3643            future.completeExceptionally(err2);
3644          } else {
3645            future.complete(result);
3646          }
3647        });
3648    });
3649    return future;
3650  }
3651
3652  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3653    ReplicationPeerDescription peer) {
3654    Configuration peerConf = null;
3655    try {
3656      peerConf =
3657        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3658    } catch (IOException e) {
3659      return failedFuture(e);
3660    }
3661    CompletableFuture<Void> future = new CompletableFuture<>();
3662    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3663      if (err != null) {
3664        future.completeExceptionally(err);
3665        return;
3666      }
3667      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3668        if (err1 != null) {
3669          future.completeExceptionally(err1);
3670          return;
3671        }
3672        AsyncAdmin peerAdmin = conn.getAdmin();
3673        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3674          if (err2 != null) {
3675            future.completeExceptionally(err2);
3676            return;
3677          }
3678          if (!exist) {
3679            CompletableFuture<Void> createTableFuture = null;
3680            if (splits == null) {
3681              createTableFuture = peerAdmin.createTable(tableDesc);
3682            } else {
3683              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3684            }
3685            addListener(createTableFuture, (result, err3) -> {
3686              if (err3 != null) {
3687                future.completeExceptionally(err3);
3688              } else {
3689                future.complete(result);
3690              }
3691            });
3692          } else {
3693            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3694              (result, err4) -> {
3695                if (err4 != null) {
3696                  future.completeExceptionally(err4);
3697                } else {
3698                  future.complete(result);
3699                }
3700              });
3701          }
3702        });
3703      });
3704    });
3705    return future;
3706  }
3707
3708  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3709    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3710    CompletableFuture<Void> future = new CompletableFuture<>();
3711    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3712      if (err != null) {
3713        future.completeExceptionally(err);
3714        return;
3715      }
3716      if (peerTableDesc == null) {
3717        future.completeExceptionally(
3718          new IllegalArgumentException("Failed to get table descriptor for table "
3719            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3720        return;
3721      }
3722      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3723        future.completeExceptionally(new IllegalArgumentException(
3724          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3725            + ", but the table descriptors are not same when compared with source cluster."
3726            + " Thus can not enable the table's replication switch."));
3727        return;
3728      }
3729      future.complete(null);
3730    });
3731    return future;
3732  }
3733
3734  /**
3735   * Set the table's replication switch if the table's replication switch is already not set.
3736   * @param tableName name of the table
3737   * @param enableRep is replication switch enable or disable
3738   */
3739  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3740    CompletableFuture<Void> future = new CompletableFuture<>();
3741    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3742      if (err != null) {
3743        future.completeExceptionally(err);
3744        return;
3745      }
3746      if (!tableDesc.matchReplicationScope(enableRep)) {
3747        int scope =
3748          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3749        TableDescriptor newTableDesc =
3750          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3751        addListener(modifyTable(newTableDesc), (result, err2) -> {
3752          if (err2 != null) {
3753            future.completeExceptionally(err2);
3754          } else {
3755            future.complete(result);
3756          }
3757        });
3758      } else {
3759        future.complete(null);
3760      }
3761    });
3762    return future;
3763  }
3764
3765  @Override
3766  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3767    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3768    request.setPeerId(peerId);
3769    return this.<Boolean> newMasterCaller()
3770      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3771        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3772          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3773          resp -> resp.getIsEnabled()))
3774      .call();
3775  }
3776
3777  @Override
3778  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3779    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3780    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3781      if (err != null) {
3782        future.completeExceptionally(err);
3783        return;
3784      }
3785      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3786        locations.stream().filter(l -> l.getRegion() != null)
3787          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3788          .collect(Collectors.groupingBy(l -> l.getServerName(),
3789            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3790      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3791      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3792      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3793        futures
3794          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3795            if (err2 != null) {
3796              future.completeExceptionally(unwrapCompletionException(err2));
3797            } else {
3798              aggregator.append(stats);
3799            }
3800          }));
3801      }
3802      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3803        (ret, err3) -> {
3804          if (err3 != null) {
3805            future.completeExceptionally(unwrapCompletionException(err3));
3806          } else {
3807            future.complete(aggregator.sum());
3808          }
3809        });
3810    });
3811    return future;
3812  }
3813
3814  @Override
3815  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3816    boolean preserveSplits) {
3817    CompletableFuture<Void> future = new CompletableFuture<>();
3818    addListener(tableExists(tableName), (exist, err) -> {
3819      if (err != null) {
3820        future.completeExceptionally(err);
3821        return;
3822      }
3823      if (!exist) {
3824        future.completeExceptionally(new TableNotFoundException(tableName));
3825        return;
3826      }
3827      addListener(tableExists(newTableName), (exist1, err1) -> {
3828        if (err1 != null) {
3829          future.completeExceptionally(err1);
3830          return;
3831        }
3832        if (exist1) {
3833          future.completeExceptionally(new TableExistsException(newTableName));
3834          return;
3835        }
3836        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3837          if (err2 != null) {
3838            future.completeExceptionally(err2);
3839            return;
3840          }
3841          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3842          if (preserveSplits) {
3843            addListener(getTableSplits(tableName), (splits, err3) -> {
3844              if (err3 != null) {
3845                future.completeExceptionally(err3);
3846              } else {
3847                addListener(
3848                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3849                  (result, err4) -> {
3850                    if (err4 != null) {
3851                      future.completeExceptionally(err4);
3852                    } else {
3853                      future.complete(result);
3854                    }
3855                  });
3856              }
3857            });
3858          } else {
3859            addListener(createTable(newTableDesc), (result, err5) -> {
3860              if (err5 != null) {
3861                future.completeExceptionally(err5);
3862              } else {
3863                future.complete(result);
3864              }
3865            });
3866          }
3867        });
3868      });
3869    });
3870    return future;
3871  }
3872
3873  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3874    List<RegionInfo> hris) {
3875    return this.<CacheEvictionStats> newAdminCaller()
3876      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
3877        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
3878          RequestConverter.buildClearRegionBlockCacheRequest(hris),
3879          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3880          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3881      .serverName(serverName).call();
3882  }
3883
3884  @Override
3885  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3886    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3887      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
3888        Boolean> call(controller, stub,
3889          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3890          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3891          resp -> resp.getPreviousRpcThrottleEnabled()))
3892      .call();
3893    return future;
3894  }
3895
3896  @Override
3897  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3898    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3899      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
3900        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3901          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3902          resp -> resp.getRpcThrottleEnabled()))
3903      .call();
3904    return future;
3905  }
3906
3907  @Override
3908  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3909    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3910      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
3911        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
3912          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3913            .build(),
3914          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3915          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3916      .call();
3917    return future;
3918  }
3919
3920  @Override
3921  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3922    return this.<Map<TableName, Long>> newMasterCaller()
3923      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
3924        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
3925          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3926          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3927          resp -> resp.getSizesList().stream().collect(Collectors
3928            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3929      .call();
3930  }
3931
3932  @Override
3933  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
3934    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
3935    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3936      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
3937        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
3938          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3939          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3940          resp -> resp.getSnapshotsList().stream()
3941            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3942              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3943      .serverName(serverName).call();
3944  }
3945
3946  private CompletableFuture<SpaceQuotaSnapshot>
3947    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3948    return this.<SpaceQuotaSnapshot> newMasterCaller()
3949      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
3950        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
3951          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3952      .call();
3953  }
3954
3955  @Override
3956  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3957    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3958      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3959      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3960  }
3961
3962  @Override
3963  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3964    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3965    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3966      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3967      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3968  }
3969
3970  @Override
3971  public CompletableFuture<Void> grant(UserPermission userPermission,
3972    boolean mergeExistingPermissions) {
3973    return this.<Void> newMasterCaller()
3974      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
3975        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3976        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3977      .call();
3978  }
3979
3980  @Override
3981  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3982    return this.<Void> newMasterCaller()
3983      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3984        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3985        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3986      .call();
3987  }
3988
3989  @Override
3990  public CompletableFuture<List<UserPermission>>
3991    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3992    return this.<List<UserPermission>> newMasterCaller()
3993      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
3994        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
3995          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3996          (s, c, req, done) -> s.getUserPermissions(c, req, done),
3997          resp -> resp.getUserPermissionList().stream()
3998            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3999            .collect(Collectors.toList())))
4000      .call();
4001  }
4002
4003  @Override
4004  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4005    List<Permission> permissions) {
4006    return this.<List<Boolean>> newMasterCaller()
4007      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4008        List<Boolean>> call(controller, stub,
4009          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4010          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4011          resp -> resp.getHasUserPermissionList()))
4012      .call();
4013  }
4014
4015  @Override
4016  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4017    return this.<Boolean> newMasterCaller()
4018      .action((controller, stub) -> this.call(controller, stub,
4019        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4020        MasterService.Interface::switchSnapshotCleanup,
4021        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4022      .call();
4023  }
4024
4025  @Override
4026  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4027    return this.<Boolean> newMasterCaller()
4028      .action((controller, stub) -> this.call(controller, stub,
4029        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4030        MasterService.Interface::isSnapshotCleanupEnabled,
4031        IsSnapshotCleanupEnabledResponse::getEnabled))
4032      .call();
4033  }
4034
4035  @Override
4036  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4037    return this.<Void> newMasterCaller()
4038      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4039        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4040        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4041      .call();
4042  }
4043
4044  @Override
4045  public CompletableFuture<Void> addRSGroup(String groupName) {
4046    return this.<Void> newMasterCaller()
4047      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4048        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4049        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4050      .call();
4051  }
4052
4053  @Override
4054  public CompletableFuture<Void> removeRSGroup(String groupName) {
4055    return this.<Void> newMasterCaller()
4056      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4057        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4058        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4059      .call();
4060  }
4061
4062  @Override
4063  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4064    BalanceRequest request) {
4065    return this.<BalanceResponse> newMasterCaller()
4066      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4067        BalanceResponse> call(controller, stub,
4068          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4069          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4070      .call();
4071  }
4072
4073  @Override
4074  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4075    return this.<List<RSGroupInfo>> newMasterCaller()
4076      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4077        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4078          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4079            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4080      .call();
4081  }
4082
4083  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4084    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4085    final String logType) {
4086    if (CollectionUtils.isEmpty(serverNames)) {
4087      return CompletableFuture.completedFuture(Collections.emptyList());
4088    }
4089    return CompletableFuture.supplyAsync(() -> serverNames
4090      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4091        filterParams, limit, logType))
4092      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4093  }
4094
4095  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4096    Map<String, Object> filterParams, int limit, String logType) {
4097    return this.<List<LogEntry>> newAdminCaller()
4098      .action((controller, stub) -> this.adminCall(controller, stub,
4099        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4100        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4101      .serverName(serverName).call();
4102  }
4103
4104  @Override
4105  public CompletableFuture<List<Boolean>>
4106    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4107    if (CollectionUtils.isEmpty(serverNames)) {
4108      return CompletableFuture.completedFuture(Collections.emptyList());
4109    }
4110    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4111      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4112    return convertToFutureOfList(clearSlowLogResponseList);
4113  }
4114
4115  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4116    return this.<Boolean> newAdminCaller()
4117      .action((controller, stub) -> this.adminCall(controller, stub,
4118        RequestConverter.buildClearSlowLogResponseRequest(),
4119        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4120      .serverName(serverName).call();
4121  }
4122
4123  private static <T> CompletableFuture<List<T>>
4124    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4125    CompletableFuture<Void> allDoneFuture =
4126      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4127    return allDoneFuture
4128      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4129  }
4130
4131  @Override
4132  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4133    return this.<List<TableName>> newMasterCaller()
4134      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4135        List<TableName>> call(controller, stub,
4136          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4137          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4138            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4139      .call();
4140  }
4141
4142  @Override
4143  public CompletableFuture<Pair<List<String>, List<TableName>>>
4144    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4145    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4146      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4147        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4148        Pair<List<String>, List<TableName>>> call(controller, stub,
4149          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4150            .build(),
4151          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4152          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4153            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4154      .call();
4155  }
4156
4157  @Override
4158  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4159    return this.<RSGroupInfo> newMasterCaller()
4160      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4161        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4162          GetRSGroupInfoOfServerRequest.newBuilder()
4163            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4164              .setPort(hostPort.getPort()).build())
4165            .build(),
4166          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4167          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4168      .call();
4169  }
4170
4171  @Override
4172  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4173    return this.<Void> newMasterCaller()
4174      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4175        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4176        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4177      .call();
4178  }
4179
4180  @Override
4181  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4182    CompletableFuture<Void> future = new CompletableFuture<>();
4183    for (TableName tableName : tables) {
4184      addListener(tableExists(tableName), (exist, err) -> {
4185        if (err != null) {
4186          future.completeExceptionally(err);
4187          return;
4188        }
4189        if (!exist) {
4190          future.completeExceptionally(new TableNotFoundException(tableName));
4191          return;
4192        }
4193      });
4194    }
4195    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4196      if (err != null) {
4197        future.completeExceptionally(err);
4198        return;
4199      }
4200      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4201        future.complete(null);
4202        return;
4203      }
4204      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4205      for (TableDescriptor td : tableDescriptions) {
4206        newTableDescriptors
4207          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4208      }
4209      addListener(
4210        CompletableFuture.allOf(
4211          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4212        (v, e) -> {
4213          if (e != null) {
4214            future.completeExceptionally(e);
4215          } else {
4216            future.complete(v);
4217          }
4218        });
4219    });
4220    return future;
4221  }
4222
4223  @Override
4224  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4225    return this.<RSGroupInfo> newMasterCaller()
4226      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4227        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4228          GetRSGroupInfoOfTableRequest.newBuilder()
4229            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4230          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4231          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4232      .call();
4233  }
4234
4235  @Override
4236  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4237    return this.<RSGroupInfo> newMasterCaller()
4238      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4239        RSGroupInfo> call(controller, stub,
4240          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4241          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4242          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4243      .call();
4244  }
4245
4246  @Override
4247  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4248    return this.<Void> newMasterCaller()
4249      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4250        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4251          .setNewRsgroupName(newName).build(),
4252        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4253      .call();
4254  }
4255
4256  @Override
4257  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4258    Map<String, String> configuration) {
4259    UpdateRSGroupConfigRequest.Builder request =
4260      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4261    if (configuration != null) {
4262      configuration.entrySet().forEach(e -> request.addConfiguration(
4263        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4264    }
4265    return this.<Void> newMasterCaller()
4266      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4267        Void> call(controller, stub, request.build(),
4268          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4269      .call();
4270  }
4271
4272  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4273    return this.<List<LogEntry>> newMasterCaller()
4274      .action((controller, stub) -> this.call(controller, stub,
4275        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4276        ProtobufUtil::toBalancerDecisionResponse))
4277      .call();
4278  }
4279
4280  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4281    return this.<List<LogEntry>> newMasterCaller()
4282      .action((controller, stub) -> this.call(controller, stub,
4283        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4284        ProtobufUtil::toBalancerRejectionResponse))
4285      .call();
4286  }
4287
4288  @Override
4289  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4290    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4291    if (logType == null || serverType == null) {
4292      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4293    }
4294    switch (logType) {
4295      case "SLOW_LOG":
4296      case "LARGE_LOG":
4297        if (ServerType.MASTER.equals(serverType)) {
4298          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4299        }
4300        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4301      case "BALANCER_DECISION":
4302        if (ServerType.REGION_SERVER.equals(serverType)) {
4303          throw new IllegalArgumentException(
4304            "Balancer Decision logs are not maintained by HRegionServer");
4305        }
4306        return getBalancerDecisions(limit);
4307      case "BALANCER_REJECTION":
4308        if (ServerType.REGION_SERVER.equals(serverType)) {
4309          throw new IllegalArgumentException(
4310            "Balancer Rejection logs are not maintained by HRegionServer");
4311        }
4312        return getBalancerRejections(limit);
4313      default:
4314        return CompletableFuture.completedFuture(Collections.emptyList());
4315    }
4316  }
4317
4318  @Override
4319  public CompletableFuture<Void> flushMasterStore() {
4320    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4321    return this.<Void> newMasterCaller()
4322      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4323        Void> call(controller, stub, request.build(),
4324          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4325      .call();
4326  }
4327}