Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ public enum Status {
USED_WORKER_GROUP_EXISTS(1402004,
"You can not reassign worker groups to the project, cause these worker groups {0} are already used.",
"Worker组{0}被项目中任务或定时引用,无法重新分配"),
WORKER_GROUP_NOT_ASSIGNED_TO_PROJECT(1402005,
"Worker group [{0}] is not assigned to the project.",
"Worker组[{0}]未分配给项目"),
CREATE_WORKFLOW_LINEAGE_ERROR(1403001, "create workflow lineage error", "创建工作流血缘错误"),
UPDATE_WORKFLOW_LINEAGE_ERROR(1403002, "update workflow lineage error", "更新工作流血缘错误"),
DELETE_WORKFLOW_LINEAGE_ERROR(1403003, "delete workflow lineage error", "删除工作流血缘错误"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
import org.apache.dolphinscheduler.dao.entity.User;
Expand All @@ -43,4 +44,22 @@ public interface ProjectWorkerGroupRelationService {
*/
List<ProjectWorkerGroup> queryAssignedWorkerGroupsByProject(User loginUser, Long projectCode);

/**
* check if worker group is assigned to project
*
* @param projectCode project code
* @param workerGroup worker group name
* @return true if worker group is assigned to project
*/
boolean isWorkerGroupAssignedToProject(Long projectCode, String workerGroup);

/**
* validate worker groups are all assigned to project
*
* @param projectCode project code
* @param workerGroups worker group names to validate
* @throws ServiceException if any worker group is not assigned
*/
void validateWorkerGroupsAssignedToProject(Long projectCode, List<String> workerGroups);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated with WorkerGroupValidator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated with WorkerGroupValidator

Okay, I will unify the logic.


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectPreferenceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidationContext;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidator;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectPreference;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectPreferenceMapper;
import org.apache.dolphinscheduler.dao.repository.ProjectDao;

import org.apache.commons.lang3.StringUtils;

import java.util.Date;
import java.util.Map;
import java.util.Objects;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -39,6 +46,7 @@
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;

@Service
@Slf4j
Expand All @@ -55,6 +63,9 @@ public class ProjectPreferenceServiceImpl extends BaseServiceImpl
@Autowired
private ProjectDao projectDao;

@Autowired
private WorkerGroupValidator workerGroupValidator;

@Override
public Result updateProjectPreference(User loginUser, long projectCode, String preferences) {
Result result = new Result();
Expand All @@ -67,6 +78,33 @@ public Result updateProjectPreference(User loginUser, long projectCode, String p
.selectOne(new QueryWrapper<ProjectPreference>().lambda().eq(ProjectPreference::getProjectCode,
projectCode));

// Validate workerGroup is assigned to project
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Validate workerGroup is assigned to project

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (StringUtils.isNotEmpty(preferences)) {
try {
Map<String, Object> preferenceMap =
JSONUtils.parseObject(preferences, new TypeReference<Map<String, Object>>() {
});
if (preferenceMap != null) {
Object workerGroupObj = preferenceMap.get("workerGroup");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using entity instead of hard coding.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using entity instead of hard coding.

good idea

if (workerGroupObj != null) {
String workerGroup = String.valueOf(workerGroupObj);
WorkerGroupValidationContext workerGroupContext = WorkerGroupValidationContext.builder()
.workerGroup(workerGroup)
.projectCode(projectCode)
.build();
try {
workerGroupValidator.validate(workerGroupContext);
} catch (ServiceException e) {
putMsg(result, Status.WORKER_GROUP_NOT_ASSIGNED_TO_PROJECT, workerGroup);
return result;
}
}
}
} catch (Exception e) {
log.warn("Failed to parse preferences JSON: {}", preferences, e);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw ServiceException here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw ServiceException here.

ok

}
}

Date now = new Date();
if (Objects.isNull(projectPreference)) {
projectPreference = new ProjectPreference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,24 @@ public List<ProjectWorkerGroup> queryAssignedWorkerGroupsByProject(User loginUse
}).distinct().collect(Collectors.toList());
}

/**
* Get all assigned worker group names for a project (internal method, no auth check)
*/
private Set<String> getAllAssignedWorkerGroupNames(Long projectCode) {
Project project = projectDao.queryByCode(projectCode);
Set<String> assignedWorkerGroups = new TreeSet<>();

if (project != null) {
assignedWorkerGroups.addAll(getAllUsedWorkerGroups(project));
}

Set<String> directlyAssignedGroups =
projectWorkerGroupDao.queryAssignedWorkerGroupNamesByProjectCode(projectCode);
assignedWorkerGroups.addAll(directlyAssignedGroups);

return assignedWorkerGroups;
}

private Set<String> getAllUsedWorkerGroups(Project project) {
Set<String> usedWorkerGroups = new TreeSet<>();
// query all worker groups that tasks depend on
Expand All @@ -226,4 +244,27 @@ private Set<String> getAllUsedWorkerGroups(Project project) {
return usedWorkerGroups;
}

@Override
public boolean isWorkerGroupAssignedToProject(Long projectCode, String workerGroup) {
if (StringUtils.isEmpty(workerGroup)) {
return true;
}
return getAllAssignedWorkerGroupNames(projectCode).contains(workerGroup);
}

@Override
public void validateWorkerGroupsAssignedToProject(Long projectCode, List<String> workerGroups) {
if (CollectionUtils.isEmpty(workerGroups)) {
return;
}

List<String> notAssignedWorkerGroups = workerGroups.stream()
.filter(workerGroup -> !isWorkerGroupAssignedToProject(projectCode, workerGroup))
.collect(Collectors.toList());

if (!notAssignedWorkerGroups.isEmpty()) {
throw new ServiceException(Status.WORKER_GROUP_NOT_ASSIGNED_TO_PROJECT, notAssignedWorkerGroups.toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.validator.TenantExistValidator;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidationContext;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidator;
import org.apache.dolphinscheduler.api.vo.ScheduleVO;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
Expand Down Expand Up @@ -97,6 +99,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private TenantExistValidator tenantExistValidator;

@Autowired
private WorkerGroupValidator workerGroupValidator;

/**
* save schedule
*
Expand Down Expand Up @@ -182,6 +187,13 @@ public Schedule insertSchedule(User loginUser,
scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setWorkflowInstancePriority(workflowInstancePriority);

// Validate workerGroup
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Validate workerGroup

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

WorkerGroupValidationContext workerGroupContext = WorkerGroupValidationContext.builder()
.workerGroup(workerGroup)
.projectCode(projectCode)
.build();
workerGroupValidator.validate(workerGroupContext);
scheduleObj.setWorkerGroup(workerGroup);
scheduleObj.setEnvironmentCode(environmentCode);
scheduleDao.insert(scheduleObj);
Expand Down Expand Up @@ -570,6 +582,12 @@ private Schedule updateSchedule(Schedule schedule, WorkflowDefinition workflowDe
schedule.setFailureStrategy(failureStrategy);
}

// Validate workerGroup
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Validate workerGroup

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

WorkerGroupValidationContext workerGroupContext = WorkerGroupValidationContext.builder()
.workerGroup(workerGroup)
.projectCode(workflowDefinition.getProjectCode())
.build();
workerGroupValidator.validate(workerGroupContext);
schedule.setWorkerGroup(workerGroup);
schedule.setEnvironmentCode(environmentCode);
schedule.setUpdateTime(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.ProjectWorkerGroupRelationService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
Expand Down Expand Up @@ -209,6 +210,9 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl implements Wo
@Autowired
private GlobalParamsValidator globalParamsValidator;

@Autowired
private ProjectWorkerGroupRelationService projectWorkerGroupRelationService;

/**
* create workflow definition
*
Expand Down Expand Up @@ -256,6 +260,10 @@ public WorkflowDefinition createWorkflowDefinition(User loginUser,
globalParamsValidator.validate(globalParams);

List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);

// Validate worker groups in task definitions
validateTaskWorkerGroups(projectCode, taskDefinitionLogs);

List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);

long workflowDefinitionCode = CodeGenerateUtils.genCode();
Expand Down Expand Up @@ -381,6 +389,23 @@ private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinition
}
}

/**
* Validate worker groups in task definitions
*/
private void validateTaskWorkerGroups(long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) {
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
return;
}

List<String> workerGroups = taskDefinitionLogs.stream()
.map(TaskDefinitionLog::getWorkerGroup)
.filter(StringUtils::isNotEmpty)
.distinct()
.collect(Collectors.toList());

projectWorkerGroupRelationService.validateWorkerGroupsAssignedToProject(projectCode, workerGroups);
}
Comment on lines +395 to +405
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of code should put into WorkerGroupValidator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of code should put into WorkerGroupValidator

Okay, I will unify the logic.


private List<WorkflowTaskRelationLog> generateTaskRelationList(String taskRelationJson,
List<TaskDefinitionLog> taskDefinitionLogs) {
try {
Expand Down Expand Up @@ -626,6 +651,10 @@ public WorkflowDefinition updateWorkflowDefinition(User loginUser,
globalParamsValidator.validate(globalParams);

List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);

// Validate worker groups in task definitions
validateTaskWorkerGroups(projectCode, taskDefinitionLogs);

List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);

WorkflowDefinition workflowDefinition = workflowDefinitionDao.queryByCode(code).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.validator;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Validation context for workerGroup validation
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkerGroupValidationContext {

/**
* The workerGroup to validate
*/
private String workerGroup;

/**
* The project code to check against
*/
private long projectCode;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all unnessnary comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all unnessnary comment.

ok

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.validator;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectWorkerGroupRelationService;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* Validator for workerGroup validation
* Checks if the workerGroup is assigned to the project
*/
@Slf4j
@Component
public class WorkerGroupValidator implements IValidator<WorkerGroupValidationContext> {

@Autowired
private ProjectWorkerGroupRelationService projectWorkerGroupRelationService;

@Override
public void validate(final WorkerGroupValidationContext context) {
String workerGroup = context.getWorkerGroup();
long projectCode = context.getProjectCode();

if (StringUtils.isNotEmpty(workerGroup)
&& !projectWorkerGroupRelationService.isWorkerGroupAssignedToProject(projectCode, workerGroup)) {
log.warn("Worker group {} is not assigned to project {}", workerGroup, projectCode);
throw new ServiceException(Status.WORKER_GROUP_NOT_ASSIGNED_TO_PROJECT, workerGroup);
}
}
}
Loading
Loading