001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.Modifier; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.util.NonceKey; 028import org.apache.hadoop.hbase.util.RetryCounter; 029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; 030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 034import org.apache.hbase.thirdparty.com.google.protobuf.Any; 035import org.apache.hbase.thirdparty.com.google.protobuf.Internal; 036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 037import org.apache.hbase.thirdparty.com.google.protobuf.Message; 038import org.apache.hbase.thirdparty.com.google.protobuf.Parser; 039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 043 044/** 045 * Helper to convert to/from ProcedureProtos 046 */ 047@InterfaceAudience.Private 048public final class ProcedureUtil { 049 private ProcedureUtil() { 050 } 051 052 // ========================================================================== 053 // Reflection helpers to create/validate a Procedure object 054 // ========================================================================== 055 private static Procedure<?> newProcedure(String className) throws BadProcedureException { 056 try { 057 Class<?> clazz = Class.forName(className); 058 if (!Modifier.isPublic(clazz.getModifiers())) { 059 throw new Exception("the " + clazz + " class is not public"); 060 } 061 062 @SuppressWarnings("rawtypes") 063 Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor(); 064 assert ctor != null : "no constructor found"; 065 if (!Modifier.isPublic(ctor.getModifiers())) { 066 throw new Exception("the " + clazz + " constructor is not public"); 067 } 068 return ctor.newInstance(); 069 } catch (Exception e) { 070 throw new BadProcedureException( 071 "The procedure class " + className + " must be accessible and have an empty constructor", 072 e); 073 } 074 } 075 076 static void validateClass(Procedure<?> proc) throws BadProcedureException { 077 try { 078 Class<?> clazz = proc.getClass(); 079 if (!Modifier.isPublic(clazz.getModifiers())) { 080 throw new Exception("the " + clazz + " class is not public"); 081 } 082 083 Constructor<?> ctor = clazz.getConstructor(); 084 assert ctor != null; 085 if (!Modifier.isPublic(ctor.getModifiers())) { 086 throw new Exception("the " + clazz + " constructor is not public"); 087 } 088 } catch (Exception e) { 089 throw new BadProcedureException("The procedure class " + proc.getClass().getName() 090 + " must be accessible and have an empty constructor", e); 091 } 092 } 093 094 // ========================================================================== 095 // convert to and from Procedure object 096 // ========================================================================== 097 098 /** 099 * A serializer for our Procedures. Instead of the previous serializer, it uses the stateMessage 100 * list to store the internal state of the Procedures. 101 */ 102 private static class StateSerializer implements ProcedureStateSerializer { 103 private final ProcedureProtos.Procedure.Builder builder; 104 private int deserializeIndex; 105 106 public StateSerializer(ProcedureProtos.Procedure.Builder builder) { 107 this.builder = builder; 108 } 109 110 @Override 111 public void serialize(Message message) throws IOException { 112 Any packedMessage = Any.pack(message); 113 builder.addStateMessage(packedMessage); 114 } 115 116 @Override 117 public <M extends Message> M deserialize(Class<M> clazz) throws IOException { 118 if (deserializeIndex >= builder.getStateMessageCount()) { 119 throw new IOException("Invalid state message index: " + deserializeIndex); 120 } 121 122 try { 123 Any packedMessage = builder.getStateMessage(deserializeIndex++); 124 return packedMessage.unpack(clazz); 125 } catch (InvalidProtocolBufferException e) { 126 throw e.unwrapIOException(); 127 } 128 } 129 } 130 131 /** 132 * A serializer (deserializer) for those Procedures which were serialized before this patch. It 133 * deserializes the old, binary stateData field. 134 */ 135 private static class CompatStateSerializer implements ProcedureStateSerializer { 136 private InputStream inputStream; 137 138 public CompatStateSerializer(InputStream inputStream) { 139 this.inputStream = inputStream; 140 } 141 142 @Override 143 public void serialize(Message message) throws IOException { 144 throw new UnsupportedOperationException(); 145 } 146 147 @SuppressWarnings("unchecked") 148 @Override 149 public <M extends Message> M deserialize(Class<M> clazz) throws IOException { 150 Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType(); 151 try { 152 return parser.parseDelimitedFrom(inputStream); 153 } catch (InvalidProtocolBufferException e) { 154 throw e.unwrapIOException(); 155 } 156 } 157 } 158 159 /** 160 * Helper to convert the procedure to protobuf. 161 * <p/> 162 * Used by ProcedureStore implementations. 163 */ 164 public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc) 165 throws IOException { 166 Preconditions.checkArgument(proc != null); 167 validateClass(proc); 168 169 final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder() 170 .setClassName(proc.getClass().getName()).setProcId(proc.getProcId()).setState(proc.getState()) 171 .setSubmittedTime(proc.getSubmittedTime()).setLastUpdate(proc.getLastUpdate()); 172 173 if (proc.hasParent()) { 174 builder.setParentId(proc.getParentProcId()); 175 } 176 177 if (proc.hasTimeout()) { 178 builder.setTimeout(proc.getTimeout()); 179 } 180 181 if (proc.hasOwner()) { 182 builder.setOwner(proc.getOwner()); 183 } 184 185 final int[] stackIds = proc.getStackIndexes(); 186 if (stackIds != null) { 187 for (int i = 0; i < stackIds.length; ++i) { 188 builder.addStackId(stackIds[i]); 189 } 190 } 191 192 if (proc.hasException()) { 193 RemoteProcedureException exception = proc.getException(); 194 builder.setException( 195 RemoteProcedureException.toProto(exception.getSource(), exception.getCause())); 196 } 197 198 final byte[] result = proc.getResult(); 199 if (result != null) { 200 builder.setResult(UnsafeByteOperations.unsafeWrap(result)); 201 } 202 203 ProcedureStateSerializer serializer = new StateSerializer(builder); 204 proc.serializeStateData(serializer); 205 206 if (proc.getNonceKey() != null) { 207 builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); 208 builder.setNonce(proc.getNonceKey().getNonce()); 209 } 210 211 if (proc.hasLock()) { 212 builder.setLocked(true); 213 } 214 215 if (proc.isBypass()) { 216 builder.setBypass(true); 217 } 218 return builder.build(); 219 } 220 221 /** 222 * Helper to convert the protobuf procedure. 223 * <p/> 224 * Used by ProcedureStore implementations. 225 * <p/> 226 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className, 227 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of 228 * it by storing the data only on insert(). 229 */ 230 public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto) 231 throws IOException { 232 // Procedure from class name 233 Procedure<?> proc = newProcedure(proto.getClassName()); 234 235 // set fields 236 proc.setProcId(proto.getProcId()); 237 proc.setState(proto.getState()); 238 proc.setSubmittedTime(proto.getSubmittedTime()); 239 proc.setLastUpdate(proto.getLastUpdate()); 240 241 if (proto.hasParentId()) { 242 proc.setParentProcId(proto.getParentId()); 243 } 244 245 if (proto.hasOwner()) { 246 proc.setOwner(proto.getOwner()); 247 } 248 249 if (proto.hasTimeout()) { 250 proc.setTimeout(proto.getTimeout()); 251 } 252 253 if (proto.getStackIdCount() > 0) { 254 proc.setStackIndexes(proto.getStackIdList()); 255 } 256 257 if (proto.hasException()) { 258 assert proc.getState() == ProcedureProtos.ProcedureState.FAILED 259 || proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK 260 : "The procedure must be failed (waiting to rollback) or rolledback"; 261 proc.setFailure(RemoteProcedureException.fromProto(proto.getException())); 262 } 263 264 if (proto.hasResult()) { 265 proc.setResult(proto.getResult().toByteArray()); 266 } 267 268 if (proto.getNonce() != HConstants.NO_NONCE) { 269 proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); 270 } 271 272 if (proto.getLocked()) { 273 proc.lockedWhenLoading(); 274 } 275 276 if (proto.getBypass()) { 277 proc.bypass(null); 278 } 279 280 ProcedureStateSerializer serializer = null; 281 282 if (proto.getStateMessageCount() > 0) { 283 serializer = new StateSerializer(proto.toBuilder()); 284 } else if (proto.hasStateData()) { 285 InputStream inputStream = proto.getStateData().newInput(); 286 serializer = new CompatStateSerializer(inputStream); 287 } 288 289 if (serializer != null) { 290 proc.deserializeStateData(serializer); 291 } 292 293 return proc; 294 } 295 296 // ========================================================================== 297 // convert from LockedResource object 298 // ========================================================================== 299 300 public static LockServiceProtos.LockedResourceType 301 convertToProtoResourceType(LockedResourceType resourceType) { 302 return LockServiceProtos.LockedResourceType.valueOf(resourceType.name()); 303 } 304 305 public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) { 306 return LockServiceProtos.LockType.valueOf(lockType.name()); 307 } 308 309 public static LockServiceProtos.LockedResource 310 convertToProtoLockedResource(LockedResource lockedResource) throws IOException { 311 LockServiceProtos.LockedResource.Builder builder = 312 LockServiceProtos.LockedResource.newBuilder(); 313 314 builder.setResourceType(convertToProtoResourceType(lockedResource.getResourceType())) 315 .setResourceName(lockedResource.getResourceName()) 316 .setLockType(convertToProtoLockType(lockedResource.getLockType())); 317 318 Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure(); 319 320 if (exclusiveLockOwnerProcedure != null) { 321 ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto = 322 convertToProtoProcedure(exclusiveLockOwnerProcedure); 323 builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto); 324 } 325 326 builder.setSharedLockCount(lockedResource.getSharedLockCount()); 327 328 for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) { 329 ProcedureProtos.Procedure waitingProcedureProto = convertToProtoProcedure(waitingProcedure); 330 builder.addWaitingProcedures(waitingProcedureProto); 331 } 332 333 return builder.build(); 334 } 335 336 public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 337 "hbase.procedure.retry.sleep.interval.ms"; 338 339 // default to 1 second 340 public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000; 341 342 public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 343 "hbase.procedure.retry.max.sleep.time.ms"; 344 345 // default to 10 minutes 346 public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 347 TimeUnit.MINUTES.toMillis(10); 348 349 /** 350 * Get a retry counter for getting the backoff time. We will use the 351 * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time 352 * is 10 minutes by default. 353 * <p/> 354 * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and 355 * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not 356 * timeout. 357 */ 358 public static RetryCounter createRetryCounter(Configuration conf) { 359 long sleepIntervalMs = 360 conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS); 361 long maxSleepTimeMs = 362 conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS); 363 RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs) 364 .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()); 365 return new RetryCounter(retryConfig); 366 } 367 368 public static boolean isFinished(ProcedureProtos.Procedure proc) { 369 if (!proc.hasParentId()) { 370 switch (proc.getState()) { 371 case ROLLEDBACK: 372 case SUCCESS: 373 return true; 374 default: 375 break; 376 } 377 } 378 return false; 379 } 380}