diff --git a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java index 854d5a2107d9..52af64189e05 100755 --- a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java @@ -42,6 +42,7 @@ import javax.inject.Inject; import javax.naming.ConfigurationException; +import javax.persistence.EntityExistsException; import com.cloud.api.ApiDBUtils; import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao; @@ -3981,7 +3982,7 @@ public NicProfile addVmToNetwork(final VirtualMachine vm, final Network network, if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance VmWorkJobVO placeHolder = null; - placeHolder = createPlaceHolderWork(vm.getId()); + placeHolder = createPlaceHolderWork(vm.getId(), network.getUuid()); try { return orchestrateAddVmToNetwork(vm, network, requested); } finally { @@ -4021,10 +4022,23 @@ public NicProfile addVmToNetwork(final VirtualMachine vm, final Network network, } } + /** + * duplicated in {@see UserVmManagerImpl} for a {@see UserVmVO} + */ + private void checkIfNetworkExistsForVM(VirtualMachine virtualMachine, Network network) { + List allNics = _nicsDao.listByVmId(virtualMachine.getId()); + for (NicVO nic : allNics) { + if (nic.getNetworkId() == network.getId()) { + throw new CloudRuntimeException("A NIC already exists for VM:" + virtualMachine.getInstanceName() + " in network: " + network.getUuid()); + } + } + } + private NicProfile orchestrateAddVmToNetwork(final VirtualMachine vm, final Network network, final NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { final CallContext cctx = CallContext.current(); + checkIfNetworkExistsForVM(vm, network); s_logger.debug("Adding vm " + vm + " to network " + network + "; requested nic profile " + requested); final VMInstanceVO vmVO = _vmDao.findById(vm.getId()); final ReservationContext context = new ReservationContextImpl(null, null, cctx.getCallingUser(), cctx.getCallingAccount()); @@ -5385,7 +5399,7 @@ public Outcome migrateVmThroughJobQueue(final String vmUuid, fin Map volumeStorageMap = dest.getStorageForDisks(); if (volumeStorageMap != null) { for (Volume vol : volumeStorageMap.keySet()) { - checkConcurrentJobsPerDatastoreThreshhold(volumeStorageMap.get(vol)); + checkConcurrentJobsPerDatastoreThreshold(volumeStorageMap.get(vol)); } } @@ -5550,7 +5564,7 @@ public Outcome migrateVmForScaleThroughJobQueue( return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } - private void checkConcurrentJobsPerDatastoreThreshhold(final StoragePool destPool) { + private void checkConcurrentJobsPerDatastoreThreshold(final StoragePool destPool) { final Long threshold = VolumeApiService.ConcurrentMigrationsThresholdPerDatastore.value(); if (threshold != null && threshold > 0) { long count = _jobMgr.countPendingJobs("\"storageid\":\"" + destPool.getUuid() + "\"", MigrateVMCmd.class.getName(), MigrateVolumeCmd.class.getName(), MigrateVolumeCmdByAdmin.class.getName()); @@ -5571,7 +5585,7 @@ public Outcome migrateVmStorageThroughJobQueue( Set uniquePoolIds = new HashSet<>(poolIds); for (Long poolId : uniquePoolIds) { StoragePoolVO pool = _storagePoolDao.findById(poolId); - checkConcurrentJobsPerDatastoreThreshhold(pool); + checkConcurrentJobsPerDatastoreThreshold(pool); } final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -5618,35 +5632,61 @@ public Outcome addVmToNetworkThroughJobQueue( final List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), - VmWorkAddVmToNetwork.class.getName()); + VmWorkAddVmToNetwork.class.getName(), network.getUuid()); VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert pendingWorkJobs.size() == 1; + if (pendingWorkJobs.size() > 1) { + s_logger.warn(String.format("The number of jobs to add network %s to vm %s are %d", network.getUuid(), vm.getInstanceName(), pendingWorkJobs.size())); + } workJob = pendingWorkJobs.get(0); } else { + if (s_logger.isTraceEnabled()) { + s_logger.trace(String.format("no jobs to add network %s for vm %s yet", network, vm)); + } - workJob = new VmWorkJobVO(context.getContextId()); + workJob = createVmWorkJobToAddNetwork(vm, network, requested, context, user, account); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); + } - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + private VmWorkJobVO createVmWorkJobToAddNetwork( + VirtualMachine vm, + Network network, + NicProfile requested, + CallContext context, + User user, + Account account) { + VmWorkJobVO workJob; + workJob = new VmWorkJobVO(context.getContextId()); - // save work context info (there are some duplications) - final VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setSecondaryObjectIdentifier(network.getUuid()); + + // save work context info (there are some duplications) + final VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + try { _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } catch (CloudRuntimeException e) { + if (e.getCause() instanceof EntityExistsException) { + String msg = String.format("A job to add a nic for network %s to vm %s already exists", network.getUuid(), vm.getUuid()); + s_logger.warn(msg, e); + } + throw e; } - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - - return new VmJobVirtualMachineOutcome(workJob, vm.getId()); + return workJob; } public Outcome removeNicFromVmThroughJobQueue( @@ -5955,6 +5995,10 @@ public Pair handleVmWorkJob(final VmWork work) throws Ex } private VmWorkJobVO createPlaceHolderWork(final long instanceId) { + return createPlaceHolderWork(instanceId, null); + } + + private VmWorkJobVO createPlaceHolderWork(final long instanceId, String secondaryObjectIdentifier) { final VmWorkJobVO workJob = new VmWorkJobVO(""); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); @@ -5966,6 +6010,9 @@ private VmWorkJobVO createPlaceHolderWork(final long instanceId) { workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(instanceId); + if(StringUtils.isNotBlank(secondaryObjectIdentifier)) { + workJob.setSecondaryObjectIdentifier(secondaryObjectIdentifier); + } workJob.setInitMsid(ManagementServerNode.getManagementServerId()); _workJobDao.persist(workJob); diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql index 64c381e0e7ad..abbf4a036346 100644 --- a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql +++ b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql @@ -791,3 +791,6 @@ ALTER TABLE cloud.user_vm_details MODIFY value varchar(5120) NOT NULL; ALTER TABLE cloud_usage.usage_network DROP PRIMARY KEY, ADD PRIMARY KEY (`account_id`,`zone_id`,`host_id`,`network_id`,`event_time_millis`); ALTER TABLE `cloud`.`user_statistics` DROP INDEX `account_id`, ADD UNIQUE KEY `account_id` (`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`, `network_id`); ALTER TABLE `cloud_usage`.`user_statistics` DROP INDEX `account_id`, ADD UNIQUE KEY `account_id` (`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`, `network_id`); + +ALTER TABLE `cloud`.`vm_work_job` ADD COLUMN `secondary_object` char(100) COMMENT 'any additional item that must be checked during queueing' AFTER `vm_instance_id`; +ALTER TABLE cloud.vm_work_job ADD CONSTRAINT vm_work_job_step_and_objects UNIQUE KEY (step,vm_instance_id,secondary_object); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java index 44e39e40291c..89601e6b5d20 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java @@ -32,6 +32,8 @@ public interface VmWorkJobDao extends GenericDao { List listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd); + List listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd, String secondaryObjectIdentifier); + void updateStep(long workJobId, Step step); void expungeCompletedWorkJobs(Date cutDate); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java index e81ab1ebbf78..497f12d7366e 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java @@ -67,6 +67,7 @@ public void init() { PendingWorkJobByCommandSearch.and("jobStatus", PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ); PendingWorkJobByCommandSearch.and("vmType", PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ); PendingWorkJobByCommandSearch.and("vmInstanceId", PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ); + PendingWorkJobByCommandSearch.and("secondaryObjectIdentifier", PendingWorkJobByCommandSearch.entity().getSecondaryObjectIdentifier(), Op.EQ); PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ); PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ); PendingWorkJobByCommandSearch.done(); @@ -119,6 +120,20 @@ public List listPendingWorkJobs(VirtualMachine.Type type, long inst return this.listBy(sc, filter); } + @Override + public List listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd, String secondaryObjectIdentifier) { + + SearchCriteria sc = PendingWorkJobByCommandSearch.create(); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); + sc.setParameters("vmType", type); + sc.setParameters("vmInstanceId", instanceId); + sc.setParameters("secondaryObjectIdentifier", secondaryObjectIdentifier); + sc.setParameters("cmd", jobCmd); + + Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); + return this.listBy(sc, filter); + } + @Override public void updateStep(long workJobId, Step step) { VmWorkJobVO jobVo = findById(workJobId); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java index 9d30c2c87b95..777fcba5a3d6 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java @@ -384,7 +384,7 @@ public void setRemoved(final Date removed) { @Override public String toString() { StringBuffer sb = new StringBuffer(); - sb.append("AsyncJobVO {id:").append(getId()); + sb.append("AsyncJobVO : {id:").append(getId()); sb.append(", userId: ").append(getUserId()); sb.append(", accountId: ").append(getAccountId()); sb.append(", instanceType: ").append(getInstanceType()); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java index ef0ac7daddf7..a8a05d483dc0 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java @@ -58,6 +58,9 @@ boolean updateState() { @Column(name = "vm_instance_id") long vmInstanceId; + @Column(name = "secondary_object") + String secondaryObjectIdentifier; + protected VmWorkJobVO() { } @@ -89,4 +92,25 @@ public long getVmInstanceId() { public void setVmInstanceId(long vmInstanceId) { this.vmInstanceId = vmInstanceId; } + + public String getSecondaryObjectIdentifier() { + return secondaryObjectIdentifier; + } + + public void setSecondaryObjectIdentifier(String secondaryObjectIdentifier) { + this.secondaryObjectIdentifier = secondaryObjectIdentifier; + } + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("VmWorkJobVO : {"). + append(", step: ").append(getStep()). + append(", vmType: ").append(getVmType()). + append(", vmInstanceId: ").append(getVmInstanceId()). + append(", secondaryObjectIdentifier: ").append(getSecondaryObjectIdentifier()). + append(super.toString()). + append("}"); + return sb.toString(); + } + } diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java index 44bb79e38b25..0d5c4ce73263 100644 --- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java +++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java @@ -1384,12 +1384,7 @@ public UserVm addNicToVirtualMachine(AddNicToVMCmd cmd) throws InvalidParameterV Account vmOwner = _accountMgr.getAccount(vmInstance.getAccountId()); _networkModel.checkNetworkPermissions(vmOwner, network); - List allNics = _nicDao.listByVmId(vmInstance.getId()); - for (NicVO nic : allNics) { - if (nic.getNetworkId() == network.getId()) { - throw new CloudRuntimeException("A NIC already exists for VM:" + vmInstance.getInstanceName() + " in network: " + network.getUuid()); - } - } + checkIfNetExistsForVM(vmInstance, network); macAddress = validateOrReplaceMacAddress(macAddress, network.getId()); @@ -1456,10 +1451,22 @@ public UserVm addNicToVirtualMachine(AddNicToVMCmd cmd) throws InvalidParameterV } } CallContext.current().putContextParameter(Nic.class, guestNic.getUuid()); - s_logger.debug("Successful addition of " + network + " from " + vmInstance); + s_logger.debug(String.format("Successful addition of %s from %s through %s", network, vmInstance, guestNic)); return _vmDao.findById(vmInstance.getId()); } + /** + * duplicated in {@see VirtualMachineManagerImpl} for a {@see VMInstanceVO} + */ + private void checkIfNetExistsForVM(VirtualMachine virtualMachine, Network network) { + List allNics = _nicDao.listByVmId(virtualMachine.getId()); + for (NicVO nic : allNics) { + if (nic.getNetworkId() == network.getId()) { + throw new CloudRuntimeException("A NIC already exists for VM:" + virtualMachine.getInstanceName() + " in network: " + network.getUuid()); + } + } + } + /** * If the given MAC address is invalid it replaces the given MAC with the next available MAC address */