001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.Modifier; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.util.NonceKey; 028import org.apache.hadoop.hbase.util.RetryCounter; 029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; 030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 034import org.apache.hbase.thirdparty.com.google.protobuf.Any; 035import org.apache.hbase.thirdparty.com.google.protobuf.Internal; 036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 037import org.apache.hbase.thirdparty.com.google.protobuf.Message; 038import org.apache.hbase.thirdparty.com.google.protobuf.Parser; 039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 043 044/** 045 * Helper to convert to/from ProcedureProtos 046 */ 047@InterfaceAudience.Private 048public final class ProcedureUtil { 049 050 private ProcedureUtil() { 051 } 052 053 // ========================================================================== 054 // Reflection helpers to create/validate a Procedure object 055 // ========================================================================== 056 private static Procedure<?> newProcedure(String className) throws BadProcedureException { 057 try { 058 Class<?> clazz = Class.forName(className); 059 if (!Modifier.isPublic(clazz.getModifiers())) { 060 throw new Exception("the " + clazz + " class is not public"); 061 } 062 063 @SuppressWarnings("rawtypes") 064 Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor(); 065 assert ctor != null : "no constructor found"; 066 if (!Modifier.isPublic(ctor.getModifiers())) { 067 throw new Exception("the " + clazz + " constructor is not public"); 068 } 069 return ctor.newInstance(); 070 } catch (Exception e) { 071 throw new BadProcedureException( 072 "The procedure class " + className + " must be accessible and have an empty constructor", 073 e); 074 } 075 } 076 077 static void validateClass(Procedure<?> proc) throws BadProcedureException { 078 try { 079 Class<?> clazz = proc.getClass(); 080 if (!Modifier.isPublic(clazz.getModifiers())) { 081 throw new Exception("the " + clazz + " class is not public"); 082 } 083 084 Constructor<?> ctor = clazz.getConstructor(); 085 assert ctor != null; 086 if (!Modifier.isPublic(ctor.getModifiers())) { 087 throw new Exception("the " + clazz + " constructor is not public"); 088 } 089 } catch (Exception e) { 090 throw new BadProcedureException("The procedure class " + proc.getClass().getName() 091 + " must be accessible and have an empty constructor", e); 092 } 093 } 094 095 // ========================================================================== 096 // convert to and from Procedure object 097 // ========================================================================== 098 099 /** 100 * A serializer for our Procedures. Instead of the previous serializer, it uses the stateMessage 101 * list to store the internal state of the Procedures. 102 */ 103 private static class StateSerializer implements ProcedureStateSerializer { 104 private final ProcedureProtos.Procedure.Builder builder; 105 private int deserializeIndex; 106 107 public StateSerializer(ProcedureProtos.Procedure.Builder builder) { 108 this.builder = builder; 109 } 110 111 @Override 112 public void serialize(Message message) throws IOException { 113 Any packedMessage = Any.pack(message); 114 builder.addStateMessage(packedMessage); 115 } 116 117 @Override 118 public <M extends Message> M deserialize(Class<M> clazz) throws IOException { 119 if (deserializeIndex >= builder.getStateMessageCount()) { 120 throw new IOException("Invalid state message index: " + deserializeIndex); 121 } 122 123 try { 124 Any packedMessage = builder.getStateMessage(deserializeIndex++); 125 return packedMessage.unpack(clazz); 126 } catch (InvalidProtocolBufferException e) { 127 throw e.unwrapIOException(); 128 } 129 } 130 } 131 132 /** 133 * A serializer (deserializer) for those Procedures which were serialized before this patch. It 134 * deserializes the old, binary stateData field. 135 */ 136 private static class CompatStateSerializer implements ProcedureStateSerializer { 137 private InputStream inputStream; 138 139 public CompatStateSerializer(InputStream inputStream) { 140 this.inputStream = inputStream; 141 } 142 143 @Override 144 public void serialize(Message message) throws IOException { 145 throw new UnsupportedOperationException(); 146 } 147 148 @SuppressWarnings("unchecked") 149 @Override 150 public <M extends Message> M deserialize(Class<M> clazz) throws IOException { 151 Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType(); 152 try { 153 return parser.parseDelimitedFrom(inputStream); 154 } catch (InvalidProtocolBufferException e) { 155 throw e.unwrapIOException(); 156 } 157 } 158 } 159 160 /** 161 * Helper to convert the procedure to protobuf. 162 * <p/> 163 * Used by ProcedureStore implementations. 164 */ 165 public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc) 166 throws IOException { 167 Preconditions.checkArgument(proc != null); 168 validateClass(proc); 169 170 final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder() 171 .setClassName(proc.getClass().getName()).setProcId(proc.getProcId()).setState(proc.getState()) 172 .setSubmittedTime(proc.getSubmittedTime()).setLastUpdate(proc.getLastUpdate()); 173 174 if (proc.hasParent()) { 175 builder.setParentId(proc.getParentProcId()); 176 } 177 178 if (proc.isCriticalSystemTable()) { 179 builder.setIsCryticalSystemTable(true); 180 } 181 182 if (proc.hasTimeout()) { 183 builder.setTimeout(proc.getTimeout()); 184 } 185 186 if (proc.hasOwner()) { 187 builder.setOwner(proc.getOwner()); 188 } 189 190 final int[] stackIds = proc.getStackIndexes(); 191 if (stackIds != null) { 192 for (int i = 0; i < stackIds.length; ++i) { 193 builder.addStackId(stackIds[i]); 194 } 195 } 196 builder.setExecuted(proc.wasExecuted()); 197 198 if (proc.hasException()) { 199 RemoteProcedureException exception = proc.getException(); 200 builder.setException( 201 RemoteProcedureException.toProto(exception.getSource(), exception.getCause())); 202 } 203 204 final byte[] result = proc.getResult(); 205 if (result != null) { 206 builder.setResult(UnsafeByteOperations.unsafeWrap(result)); 207 } 208 209 ProcedureStateSerializer serializer = new StateSerializer(builder); 210 proc.serializeStateData(serializer); 211 212 if (proc.getNonceKey() != null) { 213 builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); 214 builder.setNonce(proc.getNonceKey().getNonce()); 215 } 216 217 if (proc.hasLock()) { 218 builder.setLocked(true); 219 } 220 221 if (proc.isBypass()) { 222 builder.setBypass(true); 223 } 224 return builder.build(); 225 } 226 227 /** 228 * Helper to convert the protobuf procedure. 229 * <p/> 230 * Used by ProcedureStore implementations. 231 * <p/> 232 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className, 233 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of 234 * it by storing the data only on insert(). 235 */ 236 public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto) 237 throws IOException { 238 // Procedure from class name 239 Procedure<?> proc = newProcedure(proto.getClassName()); 240 241 // set fields 242 proc.setProcId(proto.getProcId()); 243 proc.setState(proto.getState()); 244 proc.setSubmittedTime(proto.getSubmittedTime()); 245 proc.setLastUpdate(proto.getLastUpdate()); 246 247 if (proto.hasParentId()) { 248 proc.setParentProcId(proto.getParentId()); 249 } 250 251 if (proto.hasIsCryticalSystemTable()) { 252 proc.setCriticalSystemTable(proto.getIsCryticalSystemTable()); 253 } 254 255 if (proto.hasOwner()) { 256 proc.setOwner(proto.getOwner()); 257 } 258 259 if (proto.hasTimeout()) { 260 proc.setTimeout(proto.getTimeout()); 261 } 262 263 if (proto.getStackIdCount() > 0) { 264 proc.setStackIndexes(proto.getStackIdList()); 265 } 266 if (proto.getExecuted()) { 267 proc.setExecuted(); 268 } 269 270 if (proto.hasException()) { 271 assert proc.getState() == ProcedureProtos.ProcedureState.FAILED 272 || proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK 273 : "The procedure must be failed (waiting to rollback) or rolledback"; 274 proc.setFailure(RemoteProcedureException.fromProto(proto.getException())); 275 } 276 277 if (proto.hasResult()) { 278 proc.setResult(proto.getResult().toByteArray()); 279 } 280 281 if (proto.getNonce() != HConstants.NO_NONCE) { 282 proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); 283 } 284 285 if (proto.getLocked()) { 286 proc.lockedWhenLoading(); 287 } 288 289 if (proto.getBypass()) { 290 proc.bypass(null); 291 } 292 293 ProcedureStateSerializer serializer = null; 294 295 if (proto.getStateMessageCount() > 0) { 296 serializer = new StateSerializer(proto.toBuilder()); 297 } else if (proto.hasStateData()) { 298 InputStream inputStream = proto.getStateData().newInput(); 299 serializer = new CompatStateSerializer(inputStream); 300 } 301 302 if (serializer != null) { 303 proc.deserializeStateData(serializer); 304 } 305 306 return proc; 307 } 308 309 // ========================================================================== 310 // convert from LockedResource object 311 // ========================================================================== 312 313 public static LockServiceProtos.LockedResourceType 314 convertToProtoResourceType(LockedResourceType resourceType) { 315 return LockServiceProtos.LockedResourceType.valueOf(resourceType.name()); 316 } 317 318 public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) { 319 return LockServiceProtos.LockType.valueOf(lockType.name()); 320 } 321 322 public static LockServiceProtos.LockedResource 323 convertToProtoLockedResource(LockedResource lockedResource) throws IOException { 324 LockServiceProtos.LockedResource.Builder builder = 325 LockServiceProtos.LockedResource.newBuilder(); 326 327 builder.setResourceType(convertToProtoResourceType(lockedResource.getResourceType())) 328 .setResourceName(lockedResource.getResourceName()) 329 .setLockType(convertToProtoLockType(lockedResource.getLockType())); 330 331 Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure(); 332 333 if (exclusiveLockOwnerProcedure != null) { 334 ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto = 335 convertToProtoProcedure(exclusiveLockOwnerProcedure); 336 builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto); 337 } 338 339 builder.setSharedLockCount(lockedResource.getSharedLockCount()); 340 341 for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) { 342 ProcedureProtos.Procedure waitingProcedureProto = convertToProtoProcedure(waitingProcedure); 343 builder.addWaitingProcedures(waitingProcedureProto); 344 } 345 346 return builder.build(); 347 } 348 349 public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 350 "hbase.procedure.retry.sleep.interval.ms"; 351 352 // default to 1 second 353 public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000; 354 355 public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 356 "hbase.procedure.retry.max.sleep.time.ms"; 357 358 // default to 10 minutes 359 public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 360 TimeUnit.MINUTES.toMillis(10); 361 362 /** 363 * Get a retry counter for getting the backoff time. We will use the 364 * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time 365 * is 10 minutes by default. 366 * <p/> 367 * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and 368 * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not 369 * timeout. 370 */ 371 public static RetryCounter createRetryCounter(Configuration conf) { 372 long sleepIntervalMs = 373 conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS); 374 long maxSleepTimeMs = 375 conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS); 376 RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs) 377 .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()); 378 return new RetryCounter(retryConfig); 379 } 380 381 public static boolean isFinished(ProcedureProtos.Procedure proc) { 382 if (!proc.hasParentId()) { 383 switch (proc.getState()) { 384 case ROLLEDBACK: 385 case SUCCESS: 386 return true; 387 default: 388 break; 389 } 390 } 391 return false; 392 } 393}