dolphinscheduler-springboot集成

springboot集成dolphinscheduler

說明

為了避免對DolphinScheduler產生過度依賴,實踐中通常不會全面采用其內置的所有任務節點類型。相反,會選擇性地利用DolphinScheduler的HTTP任務節點功能,以此作為工作流執行管理的橋梁,對接并驅動自有項目的業務流程。這種策略不僅確保了流程編排的靈活性與擴展性,還有效減少了對外部調度系統的深度綁定,從而在提升項目自洽能力的同時,保持了良好的系統間解耦。

簡而言之,我們傾向于僅采納DolphinScheduler中的HTTP任務節點,作為調度機制的一部分,來促進我們內部項目工作流的自動化執行。這樣做既能享受DolphinScheduler帶來的調度便利,又避免了全盤接受其所有組件所帶來的潛在風險,實現了更為穩健、可控的項目管理方案。

代碼實現

為了優化與DolphinScheduler的集成,以下是三個關鍵配置類的概述,它們旨在通過初始化接口實現項目及租戶信息的同步通知。值得注意的是,為了確保數據一致性和高效通信,你的Spring Boot應用所使用的數據庫應與DolphinScheduler共享同一數據源。這一策略不僅簡化了數據管理,還促進了實時狀態更新,增強了系統的整體協調性。

簡而言之,我們精心設計了三組配置規則,允許我們的Spring Boot項目無縫對接DolphinScheduler平臺。通過這些配置,項目和租戶的動態變化能夠及時反映到DolphinScheduler中,前提是兩者共用一個數據庫實例。這種架構決策不僅優化了資源分配,還促進了跨系統間的緊密協作,為后續的業務拓展奠定了堅實的基礎。

package cn.com.lyb.data.dev.init;import cn.com.lyb.common.security.annotation.InnerRequest;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.core.web.request.Response;
import cn.com.lyb.core.web.request.ResultWrap;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;@Api(tags = "初始化dolphinscheduler數據庫信息")
@RestController
@RequestMapping("/data-dev")
public class InitializePlugin {@Autowiredprivate DataSource dataSource;@Autowiredprivate DolphinschedulerConfig dolphinschedulerConfig;@GetMapping("/init")@ApiOperation("部署全新的環境可以用此接口,否則會報錯")@InnerRequestpublic Response init(){Connection connection = null;try{connection = dataSource.getConnection();// 項目表String projectSql = "INSERT INTO `dolphinscheduler`.`t_ds_project` (`id`, `name`, `code`, `description`, `user_id`, `flag`, `create_time`, `update_time`) VALUES (1, 'lyb', '" + dolphinschedulerConfig.getProjectCode() + "', '', 1, 1, '2024-06-13 02:49:43', '2024-06-13 02:49:43');";String tokenSql = "INSERT INTO `dolphinscheduler`.`t_ds_access_token` (`id`, `user_id`, `token`, `expire_time`, `create_time`, `update_time`) VALUES (1, 1, '"+dolphinschedulerConfig.getDsdToken()+"', '2039-12-30 10:51:26', '2024-06-13 02:50:37', '2024-06-18 10:00:13');";String tenantSql = "INSERT INTO `dolphinscheduler`.`t_ds_tenant` (`id`, `tenant_code`, `description`, `queue_id`, `create_time`, `update_time`) VALUES (1, 'default', '', 1, '2024-06-13 02:50:20', '2024-06-13 02:50:20');";String userSql = "UPDATE `dolphinscheduler`.`t_ds_user` SET `user_name` = 'admin', `user_password` = '470b9934942620215ad1cb3ac2d48497', `user_type` = 0, `email` = 'xxx@qq.com', `phone` = '', `tenant_id` = 1, `create_time` = '2024-06-12 10:23:37', `update_time` = '2024-06-18 09:59:52', `queue` = '', `state` = 1, `time_zone` = 'Asia/Shanghai' WHERE `id` = 1;";List<String> sqlList = Arrays.asList(projectSql, tenantSql, tokenSql, userSql);connection.setAutoCommit(false);Statement statement = connection.createStatement();for (String sql : sqlList) {statement.addBatch(sql);}statement.executeBatch();connection.commit();statement.close();}catch (Exception e){throw new BizException("初始化報錯");}finally {if(connection != null) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}return ResultWrap.ok();}
}
package cn.com.lyb.data.dev.web.config;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;@Configuration
public class RestTemplateConfig {@Value("${xgov.template.connectTimeout}")private int connectTimeout;@Value("${xgov.template.socketTimeout}")private int socketTimeout;@Beanpublic RestTemplate restTemplate() {return new RestTemplate(httpRequestFactory());}@Beanpublic HttpComponentsClientHttpRequestFactory httpRequestFactory() {PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();connectionManager.setMaxTotal(200); // 最大連接數connectionManager.setDefaultMaxPerRoute(20); // 每個路由默認的最大連接數RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout) // 連接超時時間.setSocketTimeout(socketTimeout) // 讀取超時時間.setConnectionRequestTimeout(5000) // 從連接池獲取連接的超時時間.build();CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig).build();return new HttpComponentsClientHttpRequestFactory(httpClient);}
}
package cn.com.dev.data.dev.web.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class DolphinschedulerConfig {@Value("${lyb.dolphinscheduler.server.username}")private String dsdUsername;@Value("${lyb.dolphinscheduler.server.token}")private String dsdToken;@Value("${lyb.dolphinscheduler.server.url}")private String dsdUrl;@Value("${lyb.dolphinscheduler.server.porjectCode}")private String projectCode;@Value("${lyb.dolphinscheduler.server.tenantCode}")private String tenantCode;public String getDsdUsername() {return dsdUsername;}public void setDsdUsername(String dsdUsername) {this.dsdUsername = dsdUsername;}public String getDsdToken() {return dsdToken;}public void setDsdToken(String dsdToken) {this.dsdToken = dsdToken;}public String getDsdUrl() {return dsdUrl;}public void setDsdUrl(String dsdUrl) {this.dsdUrl = dsdUrl;}public String getProjectCode() {return projectCode;}public void setProjectCode(String projectCode) {this.projectCode = projectCode;}public String getTenantCode() {return tenantCode;}public void setTenantCode(String tenantCode) {this.tenantCode = tenantCode;}}

構建一個調用類,該類全面集成了與DolphinScheduler接口的交互邏輯,為我們的應用提供了一層抽象。對于涉及具體業務邏輯的數據封裝細節,此處將不再贅述,旨在保持代碼的清晰度與通用性。

簡言之,我們設計了一個專門的類來處理所有與DolphinScheduler的API調用,確保了業務核心邏輯的獨立性和可維護性。這一封裝策略使得代碼庫更加整潔,同時也提升了開發效率和系統的整體健壯性。

通過這種方式,我們不僅隔離了與外部服務的直接交互,還簡化了業務邏輯的實現,使其更加專注于核心功能,而非調度系統的細節。這樣的架構設計,有助于團隊成員快速理解系統架構,同時也便于未來的功能擴展和系統維護。

package cn.com.lyb.data.dev.web.dolphinscheduler.service;import cn.com.lyb.common.redis.service.RedisService;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.data.dev.enums.ProcessExecutionTypeEnum;
import cn.com.lyb.data.dev.enums.TaskExecutionStatus;
import cn.com.lyb.data.dev.enums.WorkflowExecutionStatus;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import cn.com.lyb.data.dev.workflow.entity.delphinscheduler.TaskDefinition;
import cn.com.lyb.data.dev.workflow.entity.vo.GanttTaskVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ProcessInstanceVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ResponseTaskLog;
import cn.com.lyb.data.dev.workflow.entity.vo.TaskInstanceVO;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;@Service
public class DolphinschedulerService {private static final Logger logger = LoggerFactory.getLogger(DolphinschedulerService.class);private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";private static final String X_REQUESTED_WITH = "XMLHttpRequest";@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate DolphinschedulerConfig dolphinschedulerConfig;private static final Boolean DSD_SUCCESS = true;@Autowiredprivate RedisService redisService;private static final String DSD_SESSION_KEY = "DSD_SESSION_KEY";private static final String SUCCESS = "success";private static final String MSG = "msg";// 此種方法適用登錄后獲取SESSION設置到header里面private static final String SESSION_ID = "sessionId";private static final String TOKEN = "token";private static final String DATA = "data";/*** 登錄,返回 sessionId*/public String login() {String sessionValue = redisService.getCacheObject(DSD_SESSION_KEY);if (StringUtils.isNotBlank(sessionValue)) {return sessionValue;}//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());//linkedMultiValueMap.add("userPassword", dolphinschedulerConfig.getDsdPassword());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/login";JSONObject resultJSON = doPostForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);String sessionId = data.get(SESSION_ID).toString();redisService.setCacheObject(DSD_SESSION_KEY, sessionId, 23L, TimeUnit.HOURS);return sessionId;}/*** 創建項目** @return*/public void createProject(String projectName, String description) {// 如果是https登錄可以使用該方法//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();linkedMultiValueMap.add("projectName", projectName);linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);logger.info("Azkaban請求信息:" + httpEntity.toString());String url = dolphinschedulerConfig.getDsdUrl() + "/projects";doPostForObject(url, httpEntity);}/*** 項目列表** @param pageNo* @param pageSize* @param searchVal*/public Object projectsPage(Integer pageNo, Integer pageSize, String searchVal) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url;if (StringUtils.isNotBlank(searchVal)) {url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;} else {url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize;}JSONObject resultJSON = doGetForObject(url, entity);return resultJSON.get(DATA);}/*** 修改項目** @param code* @param projectName* @param description* @return*/public Object updateProjects(String code, String projectName, String description) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();linkedMultiValueMap.add("projectName", projectName);linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;JSONObject resultJSON = doPutForObject(url, httpEntity);return resultJSON.get(DATA);}/*** 刪除項目** @param code* @return*/public Object delProjects(String code) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;JSONObject resultJSON = doDeleteForObject(url, httpEntity);return resultJSON.get(DATA);}public JSONObject doPostForObject(String url, HttpEntity httpEntity) {logger.info("調用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.POST, httpEntity, String.class);String result = exchange.getBody();logger.info("post類型接口調用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("post類型接口調用失敗:{}", e);throw new BizException(e.getMessage());}}public JSONObject doGetForObject(String url, HttpEntity httpEntity) {logger.info("調用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);String result = exchange.getBody();logger.info("get類型接口調用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("get類型接口調用失敗:{}", e);throw new BizException(e.getMessage());}}public JSONObject doPutForObject(String url, HttpEntity httpEntity) {logger.info("調用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, String.class);String result = exchange.getBody();logger.info("put類型接口調用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("put類型接口調用失敗:{}", e);throw new BizException(e.getMessage());}}public JSONObject doDeleteForObject(String url, HttpEntity httpEntity) {logger.info("調用url:{}", url);try {ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.DELETE, httpEntity, String.class);String result = exchange.getBody();logger.info("delete類型接口調用返回信息:{}", result);return getJsonObject(result);} catch (BizException be) {throw be;} catch (Exception e) {logger.error("delete類型接口調用失敗:{}", e);throw new BizException(e.getMessage());}}private static JSONObject getJsonObject(String result) {JSONObject resultJSON = JSON.parseObject(result);if (!DSD_SUCCESS.equals(resultJSON.get(SUCCESS))) {logger.error("調用結果返回異常:{}" + result);Integer code = (Integer) resultJSON.get("code");if(code.intValue() == 50019){throw new BizException("流程節點間存在循環依賴");}else if(code.intValue() == 50036){throw new BizException("工作流任務關系參數錯誤");} else {throw new BizException(resultJSON.get(MSG).toString());}}return resultJSON;}/*** 創建工作流** @param name* @param description* @param globalParams* @param locations* @param timeout* @param taskRelationJson* @param taskDefinitionJson* @param otherParamsJson* @param executionType* @return 3.2.0 版本*/public Long createWorkFlow320(String name, String description, String globalParams, String locations, int timeout,String taskRelationJson, String taskDefinitionJson, String otherParamsJson,ProcessExecutionTypeEnum executionType) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);linkedMultiValueMap.add("taskRelationJson", taskRelationJson);linkedMultiValueMap.add("timeout", timeout);linkedMultiValueMap.add("executionType", executionType);linkedMultiValueMap.add("otherParamsJson", otherParamsJson);linkedMultiValueMap.add("globalParams", globalParams);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";JSONObject resultJSON = doPostForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);Long code = (Long) data.get("code");return code;}/*** 查詢工作流列表** @param pageNo* @param pageSize* @param searchVal* @return*/public Object selectFlowPage(Integer pageNo, Integer pageSize, String searchVal) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url;if (StringUtils.isNotBlank(searchVal)) {url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;} else {url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize;}JSONObject resultJSON = doGetForObject(url, entity);return resultJSON.get(DATA);}public Object selectOneFlow(String code) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code;JSONObject resultJSON = doGetForObject(url, entity);return resultJSON.get(DATA);}public Long createWorkFlow(String name, String description, String locations,String taskDefinitionJson, String taskRelationJson,String executionType) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);linkedMultiValueMap.add("taskRelationJson", taskRelationJson);linkedMultiValueMap.add("timeout", 0);linkedMultiValueMap.add("executionType", executionType);linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());linkedMultiValueMap.add("locations", locations);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";JSONObject resultJSON = doPostForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);Long code = (Long) data.get("code");return code;}public void updateReleaseState(String name, String releaseState, Long code) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("releaseState", releaseState);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code + "/release";doPostForObject(url, httpEntity);}public List<TaskDefinition> getTaskByWorkflowCode(Long dsdCode) {//SSLUtil.turnOffSslChecking();HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;JSONObject resultJSON = doGetForObject(url, httpEntity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("taskDefinitionList");return jsonArray.toJavaList(TaskDefinition.class);}/*** 刪除工作流** @param codes*/public void delWorkflow(String codes) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("codes", codes);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/batch-delete";doPostForObject(url, httpEntity);}public void updateWorkFlow(Long dsdCode, String name, String description, String locations, String taskDefinitionJson, String taskRelationJson, String executionType) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("description", description);linkedMultiValueMap.add("name", name);linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);linkedMultiValueMap.add("taskRelationJson", taskRelationJson);linkedMultiValueMap.add("timeout", 0);linkedMultiValueMap.add("executionType", executionType);linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());linkedMultiValueMap.add("locations", locations);HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;doPutForObject(url, httpEntity);}/*** 運行工作流** @param code*/public void runWorkflow(Long code) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();linkedMultiValueMap.add("processDefinitionCode", code);linkedMultiValueMap.add("failureStrategy", "CONTINUE");linkedMultiValueMap.add("warningType", "NONE");linkedMultiValueMap.add("scheduleTime", getStringDate());HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/start-process-instance";doPostForObject(url, httpEntity);}public String getStringDate() {LocalDateTime currentDateTime = LocalDateTime.now();LocalDateTime startDate = currentDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);LocalDateTime endDate = startDate;DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");String formattedStartDate = startDate.format(formatter);JSONObject jsonObject = new JSONObject();jsonObject.put("complementStartDate", formattedStartDate);jsonObject.put("complementEndDate", formattedStartDate);return jsonObject.toString();}/*** 獲取任務日志** @param id* @return*/public ResponseTaskLog getLog(Integer id, Integer limit, Integer skipLineNum) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);String url = dolphinschedulerConfig.getDsdUrl() + "/log/detail?taskInstanceId=" + id + "&limit=" + limit + "&skipLineNum=" + skipLineNum;JSONObject resultJSON = doGetForObject(url, entity);return JSON.parseObject(resultJSON.get(DATA).toString(), ResponseTaskLog.class);}/*** 重跑任務** @param processInstanceId*/public void operation(Integer processInstanceId, String executeType) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();// 1 REPEAT_RUNNING 重跑 2 STOP 停止 3 RECOVER_SUSPENDED_PROCESS 恢復運行 4 PAUSE 暫停linkedMultiValueMap.add("processInstanceId", processInstanceId);switch (executeType) {case "1":addExecutionDetails(linkedMultiValueMap, 1, "REPEAT_RUNNING", "run");break;case "2":linkedMultiValueMap.add("executeType", "STOP");break;case "3":addExecutionDetails(linkedMultiValueMap, 0, "RECOVER_SUSPENDED_PROCESS", "suspend");break;case "4":linkedMultiValueMap.add("executeType", "PAUSE");break;default:throw new BizException("暫不支持該操作");}HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/execute";doPostForObject(url, httpEntity);}public void addExecutionDetails(MultiValueMap<String, Object> map, int index, String executeType, String buttonType) {map.add("index", String.valueOf(index));map.add("executeType", executeType);if (buttonType != null) {map.add("buttonType", buttonType);}}public PageInfo<ProcessInstanceVO> processInstances(Long dsdWorkflowCode, String searchVal, Integer pageNum, Integer pageSize,String startDate, String endDate, String stateType) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl()).append("/projects/").append(dolphinschedulerConfig.getProjectCode()).append("/process-instances?pageNo=").append(pageNum).append("&pageSize=").append(pageSize).append("&call=").append("1");// 這個必須加,不然刪除工作流后,實例會不見if(null != dsdWorkflowCode){urlBuilder.append("&processDefineCode=").append(dsdWorkflowCode);}if(StringUtils.isNotBlank(searchVal)){urlBuilder.append("&searchVal=").append(searchVal);}supplementaryParameters(startDate, endDate, stateType, urlBuilder);JSONObject resultJSON = doGetForObject(urlBuilder.toString(), entity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("totalList");List<ProcessInstanceVO> javaList = jsonArray.toJavaList(ProcessInstanceVO.class);Integer total = (Integer) data.get("total");PageInfo<ProcessInstanceVO> res = new PageInfo<>();res.setList(javaList);res.setTotal(total);return res;}private void supplementaryParameters(String startDate, String endDate, String stateType, StringBuilder urlBuilder) {if (stateType != null && !stateType.isEmpty()) {urlBuilder.append("&stateType=").append(stateType);}if (startDate != null && !startDate.isEmpty()) {urlBuilder.append("&startDate=").append(startDate);}if (endDate != null && !endDate.isEmpty()) {urlBuilder.append("&endDate=").append(endDate);}}public PageInfo<TaskInstanceVO> taskInstances(Integer processInstanceId, String startDate, String endDate,String stateType, int pageNum, int pageSize) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl()).append("/projects/").append(dolphinschedulerConfig.getProjectCode()).append("/task-instances?pageNo=").append(pageNum).append("&pageSize=").append(pageSize).append("&processInstanceId=").append(processInstanceId).append("&taskExecuteType=").append("BATCH");supplementaryParameters(startDate, endDate, stateType, urlBuilder);String url = urlBuilder.toString();JSONObject resultJSON = doGetForObject(url, entity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("totalList");List<TaskInstanceVO> javaList = jsonArray.toJavaList(TaskInstanceVO.class);Integer total = (Integer) data.get("total");PageInfo<TaskInstanceVO> res = new PageInfo<>();res.setList(javaList);res.setTotal(total);return res;}/*** 獲取工作流執行順序* @param processInstanceId* @return*/public List<GanttTaskVO> viewGantt(Long processInstanceId) {HttpHeaders hs = new HttpHeaders();hs.add("Content-Type", CONTENT_TYPE);hs.add("X-Requested-With", X_REQUESTED_WITH);hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());HttpEntity<String> entity = new HttpEntity<String>(hs);StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl()).append("/projects/").append(dolphinschedulerConfig.getProjectCode()).append("/process-instances/").append(processInstanceId).append("/view-gantt");String url = urlBuilder.toString();JSONObject resultJSON = doGetForObject(url, entity);JSONObject data = (JSONObject) resultJSON.get(DATA);JSONArray jsonArray = (JSONArray) data.get("tasks");List<GanttTaskVO> javaList = jsonArray.toJavaList(GanttTaskVO.class);return javaList;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/41987.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/41987.shtml
英文地址,請注明出處:http://en.pswp.cn/web/41987.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

信息技術課上的紀律秘訣:營造有序學習環境

信息技術課是學生們探索數字世界的樂園&#xff0c;但同時也是課堂紀律管理的挑戰場。電腦、網絡、游戲等元素可能分散學生的注意力&#xff0c;影響學習效果。本文將分享一些有效的策略&#xff0c;幫助教師在信息技術課上維持課堂紀律&#xff0c;確保教學活動順利進行。 制…

幾何建模基礎-樣條曲線和樣條曲面介紹

1.概念介紹 1.1 樣條曲線的來源 樣條的英語單詞spline來源于可變形的樣條工具&#xff0c;那是一種在造船和工程制圖時用來畫出光滑形狀的工具&#xff1a;富有彈性的均勻細木條/金屬條/有機玻璃條&#xff0c;它圍繞著按指定位置放置的重物或者壓鐵做彈性彎曲&#xff0c;以…

JS實現一個簡單的模糊匹配

1、示例數據如下&#xff1a; // 示例數據 const data [ { name: ‘Alice’, age: 25 }, { name: ‘Bob’, age: 30 }, { name: ‘Charlie’, age: 35 }, { name: ‘David’, age: 40 }, { name: ‘Eve’, age: 45 } ]; 2、模糊匹配函數 // 模糊匹配函數 function fuzzyMatch(…

基于LangChain的RAG開發教程(二)

v1.0官方文檔&#xff1a;https://python.langchain.com/v0.1/docs/get_started/introduction/ 最新文檔&#xff1a;https://python.langchain.com/v0.2/docs/introduction/ LangChain是一個能夠利用大語言模型&#xff08;LLM&#xff0c;Large Language Model&#xff09;能…

植物大戰僵尸融合嫁接版 MAC 版本下載安裝詳細教程

繼植物大戰僵尸雜交版火了之后&#xff0c;PVZ改版可謂是百花齊放&#xff0c;最近又有一個非常好玩的模式被開發出來了&#xff0c;他們稱為《植物大戰僵尸融合嫁接版》 該版本并沒有對植物卡牌做改動&#xff0c;而是可以將任意兩種植物疊放到一起進行融合&#xff0c;產生新…

思路打開!騰訊造了10億個角色,驅動數據合成!7B模型效果打爆了

世界由形形色色的角色構成&#xff0c;每個角色都擁有獨特的知識、經驗、興趣、個性和職業&#xff0c;他們共同制造了豐富多元的知識與文化。 所謂術業有專攻&#xff0c;比如AI科學家專注于構建LLMs,醫務工作者們共建龐大的醫學知識庫&#xff0c;數學家們則偏愛數學公式與定…

lvgl 本地化

生成語言包文件&#xff1a; lv_i18n compile -t en-GB.yml -o ui 正則匹配中文 "[\u4e00-\u9fa5]" _("[\u4e00-\u9fa5]") https://www.cnblogs.com/jerryqi/p/9604828.html 查找多個漢字體的 ("[\u4e00-\u9fa5]"[)]) _($1) "科室:"…

數據分析與挖掘實戰案例-電商產品評論數據情感分析

數據分析與挖掘實戰案例-電商產品評論數據情感分析 文章目錄 數據分析與挖掘實戰案例-電商產品評論數據情感分析1. 背景與挖掘目標2. 分析方法與過程2.1 評論預處理1. 評論去重2. 數據清洗 2.2 評論分詞1. 分詞、詞性標注、去除停用詞2. 提取含名詞的評論3. 繪制詞云查看分詞效…

昇思25天學習打卡營第12天 | LLM原理和實踐:MindNLP ChatGLM-6B StreamChat

1. MindNLP ChatGLM-6B StreamChat 本案例基于MindNLP和ChatGLM-6B實現一個聊天應用。 ChatGLM-6B應該是國內第一個發布的可以在消費級顯卡上進行推理部署的國產開源大模型&#xff0c;2023年3月就發布了。我在23年6月份的時候就在自己的筆記本電腦上部署測試過&#xff0c;當…

UI自動化測試框架:PO 模式+數據驅動(超詳細)

1. PO 設計模式簡介 什么是 PO 模式&#xff1f; PO&#xff08;PageObject&#xff09;設計模式將某個頁面的所有元素對象定位和對元素對象的操作封裝成一個 Page 類&#xff0c;并以頁面為單位來寫測試用例&#xff0c;實現頁面對象和測試用例的分離。 PO 模式的設計思想與…

Python學習中進行條件判斷(if, else, elif)

條件判斷是編程中必不可少的一部分&#xff0c;它讓程序可以根據不同的條件執行不同的代碼塊。在Python中&#xff0c;主要使用if、elif和else語句來實現條件判斷。 基本語法 在Python中&#xff0c;條件判斷的基本語法如下&#xff1a; if condition:# 當condition為True時…

一篇讀懂128陷阱

128陷阱 128陷阱的概念包裝器類自動裝箱自動拆箱128陷阱 Intager源碼equals 128陷阱的概念 首先想要清楚什么是128陷阱&#xff0c;需要了解一些概念 包裝器類 包裝器類&#xff08;Wrapper classes&#xff09;是Java中的一組類&#xff0c;它們允許將基本數據類型&#xf…

NCCL 中的一些輔助debug 知識點

1&#xff0c;調試nccl 啟動kernel的方法 ncclLaunchKernel cuLaunchKernelEx ncclStrongStreamLaunchKernel cudaLaunchKernel ncclLaunchOneRank cudaLaunchKernel 在 nccl lib 中&#xff0c;不存在使用<<<grid, block,,>>> 這種類似方式啟…

算法題型歸類整理及同類題型解法思路總結(持續更新)

1、最優路線 通用思路 1、遞歸 #案例1-最優路測路線 題目描述 評估一個網絡的信號質量&#xff0c;其中一個做法是將網絡劃分為柵格&#xff0c;然后對每個柵格的信號質量計算。 路測的時候&#xff0c;希望選擇一條信號最好的路線&#xff08;彼此相連的柵格集合&#x…

12種增強Python代碼的函數式編程技術

前言 什么是函數式編程&#xff1f; 一句話總結&#xff1a;函數式編程(functional programming)是一種編程范式&#xff0c;之外還有面向對象&#xff08;OOP&#xff09;、面向過程、邏輯式編程等。 函數式編程是一種高度抽象的編程范式&#xff0c;它倡導使用純函數&#x…

算法·二分

二分枚舉 適用條件&#xff1a; 答案有明顯上下界答案具有單調性:a滿足,若b>a可以知b必定滿足。本質上是枚舉的對數優化 思維技巧 解決問題->>驗證答案,明顯前者比后者更加困難若題目有最大值最小&#xff0c;最小值最大這種經典條件&#xff0c;隱含著答案有界 …

Docker-11☆ Docker Compose部署RuoYi-Cloud

一、環境準備 1.安裝Docker 附:Docker-02-01☆ Docker在線下載安裝與配置(linux) 2.安裝Docker Compose 附:Docker-10☆ Docker Compose 二、源碼下載 若依官網:RuoYi 若依官方網站 鼠標放到"源碼地址"上,點擊"RuoYi-Cloud 微服務版"。 跳轉至G…

深入理解計算機系統 CSAPP 家庭作業8.22

書本知識夠你寫出答案,但是如果你想驗證你寫的答案,就要一些額外的東西.這本書很多題目都是如此 /** mysystem.c*/ #include <stdio.h> #include "csapp.h"int mysystem(char* command) {pid_t pid;int status;if ((pid Fork()) 0) {/*這里是關鍵用子程序去…

新加坡工作和生活指北:工作篇

文章首發于公眾號&#xff1a;Keegan小鋼 一年多以前&#xff08;2022 年 8 月初&#xff09;&#xff0c;那時我過來新加坡才 4 個多月&#xff0c;就寫了篇文章分享了當時在新加坡的生活和工作體驗。文章得到的反響不錯&#xff0c;但也反饋出了一些新的問題&#xff0c;比如…

預訓練對齊:數學理論到工程實踐的橋梁

在人工智能和機器學習領域&#xff0c;預訓練模型的對齊是一個至關重要的概念。本篇博客源自聽了一場黃民烈老師關于大模型對齊的分享&#xff0c;整理內容如下&#xff0c;供大家參考。 數學理論中的預訓練對齊 數學理論上&#xff0c;預訓練對齊是什么&#xff1f; 序列…