Skip to content

Commit

Permalink
Added dataflow specific PageQueryProvider for missing functions
Browse files Browse the repository at this point in the history
Batch removed some methods that are still required by dataflow.
Created dataflow version of those classes so that we can implement those methods

Added hibernate version

Checkpoint for server core and server

Updated Skipper components

Update classic docs to be restful doc 3.0 compliant
  • Loading branch information
cppwfs committed Feb 1, 2024
1 parent 15712ec commit 089dadb
Show file tree
Hide file tree
Showing 74 changed files with 865 additions and 514 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
<module>spring-cloud-starter-dataflow-server</module>
<module>spring-cloud-starter-dataflow-ui</module>
<module>spring-cloud-dataflow-server</module>
<module>spring-cloud-dataflow-tasklauncher</module>
<module>spring-cloud-dataflow-single-step-batch-job</module>
<module>spring-cloud-dataflow-composed-task-runner</module>
<!-- <module>spring-cloud-dataflow-tasklauncher</module>-->
<!-- <module>spring-cloud-dataflow-single-step-batch-job</module>-->
<!-- <module>spring-cloud-dataflow-composed-task-runner</module>-->
<module>spring-cloud-dataflow-test</module>
<module>spring-cloud-dataflow-dependencies</module>
<module>spring-cloud-dataflow-classic-docs</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.springframework.restdocs.payload.PayloadDocumentation.subsectionWithPath;
import static org.springframework.restdocs.request.RequestDocumentation.parameterWithName;
import static org.springframework.restdocs.request.RequestDocumentation.pathParameters;
import static org.springframework.restdocs.request.RequestDocumentation.requestParameters;
import static org.springframework.restdocs.request.RequestDocumentation.queryParameters;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

/**
Expand Down Expand Up @@ -83,7 +83,7 @@ public void registeringAnApplicationVersion() throws Exception {
parameterWithName("name").description("The name of the application to register"),
parameterWithName("version").description("The version of the application to register")
),
requestParameters(
queryParameters(
parameterWithName("uri").description("URI where the application bits reside"),
parameterWithName("metadata-uri").optional()
.description("URI where the application metadata jar can be found"),
Expand All @@ -108,7 +108,7 @@ public void bulkRegisteringApps() throws Exception {
.andExpect(status().isCreated())
.andDo(
this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("uri").optional().description("URI where a properties file containing registrations can be fetched. Exclusive with `apps`."),
parameterWithName("apps").optional().description("Inline set of registrations. Exclusive with `uri`."),
parameterWithName("force").optional().description("Must be true if a registration with the same name and type already exists, otherwise an error will occur")
Expand All @@ -133,7 +133,7 @@ public void getApplicationsFiltered() throws Exception {
)
.andExpect(status().isOk())
.andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("search").description("The search string performed on the name (optional)"),
parameterWithName("type")
.description("Restrict the returned apps to the type of the app. One of " + Arrays.asList(ApplicationType.values())),
Expand Down Expand Up @@ -167,7 +167,7 @@ public void getSingleApplication() throws Exception {
parameterWithName("type").description("The type of application to query. One of " + Arrays.asList(ApplicationType.values())),
parameterWithName("name").description("The name of the application to query")
),
requestParameters(
queryParameters(
parameterWithName("exhaustive").optional()
.description("Return all application properties, including common Spring Boot properties")
),
Expand Down Expand Up @@ -205,7 +205,7 @@ public void registeringAnApplication() throws Exception {
parameterWithName("type").description("The type of application to register. One of " + Arrays.asList(ApplicationType.values())),
parameterWithName("name").description("The name of the application to register")
),
requestParameters(
queryParameters(
parameterWithName("uri").description("URI where the application bits reside"),
parameterWithName("metadata-uri").optional().description("URI where the application metadata jar can be found"),
parameterWithName("bootVersion").optional().description("The Spring Boot version of the application.Default is 2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.springframework.restdocs.payload.PayloadDocumentation.subsectionWithPath;
import static org.springframework.restdocs.request.RequestDocumentation.parameterWithName;
import static org.springframework.restdocs.request.RequestDocumentation.pathParameters;
import static org.springframework.restdocs.request.RequestDocumentation.requestParameters;
import static org.springframework.restdocs.request.RequestDocumentation.queryParameters;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -80,7 +80,7 @@ public void listAllAuditRecords() throws Exception {
.andDo(print())
.andExpect(status().isOk())
.andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page").description("The zero-based page number (optional)"),
parameterWithName("size").description("The requested page size (optional)"),
parameterWithName("operations").description("Comma-separated list of Audit Operations (optional)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.dataflow.server.rest.documentation;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -32,14 +33,7 @@
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.core.Launcher;
import org.springframework.cloud.dataflow.core.TaskPlatform;
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.controller.TaskSchedulerController;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDao;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionMetadataDaoContainer;
import org.springframework.cloud.dataflow.server.repository.JdbcDataflowTaskExecutionMetadataDao;
import org.springframework.cloud.dataflow.server.repository.support.SchemaUtilities;
import org.springframework.cloud.dataflow.server.service.SchedulerService;
import org.springframework.cloud.dataflow.server.single.LocalDataflowResource;
import org.springframework.cloud.deployer.spi.app.ActuatorOperations;
Expand Down Expand Up @@ -205,28 +199,6 @@ void destroyStream(String name) throws Exception {
);
}

protected DataflowTaskExecutionMetadataDaoContainer createDataFlowTaskExecutionMetadataDaoContainer(SchemaService schemaService) {
DataflowTaskExecutionMetadataDaoContainer result = new DataflowTaskExecutionMetadataDaoContainer();
MultiSchemaIncrementerFactory incrementerFactory = new MultiSchemaIncrementerFactory(dataSource);
String databaseType;
try {
databaseType = DatabaseType.fromMetaData(dataSource).name();
} catch (MetaDataAccessException e) {
throw new IllegalStateException(e);
}
for (SchemaVersionTarget target : schemaService.getTargets().getSchemas()) {
DataflowTaskExecutionMetadataDao dao = new JdbcDataflowTaskExecutionMetadataDao(
dataSource,
incrementerFactory.getIncrementer(databaseType,
SchemaUtilities.getQuery("%PREFIX%EXECUTION_METADATA_SEQ", target.getTaskPrefix())
),
target.getTaskPrefix()
);
result.add(target.getName(), dao);
}
return result;
}

/**
* A {@link ResultHandler} that can be turned off and on.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.dataflow.server.rest.documentation;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -29,7 +30,10 @@
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport;
Expand Down Expand Up @@ -58,7 +62,7 @@
import static org.springframework.restdocs.payload.PayloadDocumentation.subsectionWithPath;
import static org.springframework.restdocs.request.RequestDocumentation.parameterWithName;
import static org.springframework.restdocs.request.RequestDocumentation.pathParameters;
import static org.springframework.restdocs.request.RequestDocumentation.requestParameters;
import static org.springframework.restdocs.request.RequestDocumentation.queryParameters;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -128,7 +132,7 @@ public void listJobExecutions() throws Exception {
.param("size", "10"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -149,7 +153,7 @@ public void listThinJobExecutions() throws Exception {
.param("size", "10"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -171,7 +175,7 @@ public void listThinJobExecutionsByJobInstanceId() throws Exception {
.param("jobInstanceId", "1"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -195,7 +199,7 @@ public void listThinJobExecutionsByTaskExecutionId() throws Exception {
.param("taskExecutionId", "1"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -220,7 +224,7 @@ public void listThinJobExecutionsByDate() throws Exception {
.param("toDate", "2050-09-24T18:00:45,000"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -246,7 +250,7 @@ public void listJobExecutionsByName() throws Exception {
.param("size", "10"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -270,7 +274,7 @@ public void listThinJobExecutionsByName() throws Exception {
.param("size", "10"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -297,7 +301,7 @@ public void jobDisplayDetail() throws Exception {
pathParameters(
parameterWithName("id").description("The id of an existing job execution (required)")
),
requestParameters(
queryParameters(
parameterWithName("schemaTarget").description("Schema Target to the Job.").optional()
),
responseFields(
Expand Down Expand Up @@ -337,7 +341,7 @@ public void jobStop() throws Exception {
.andDo(this.documentationHandler.document(
pathParameters(parameterWithName("id")
.description("The id of an existing job execution (required)"))
, requestParameters(
, queryParameters(
parameterWithName("schemaTarget").description("The schema target of the job execution").optional(),
parameterWithName("stop")
.description("Sends signal to stop the job if set to true"))));
Expand All @@ -354,7 +358,7 @@ public void jobRestart() throws Exception {
.andDo(this.documentationHandler.document(
pathParameters(parameterWithName("id")
.description("The id of an existing job execution (required)"))
, requestParameters(
, queryParameters(
parameterWithName("schemaTarget").description("The schema target of the job execution").optional(),
parameterWithName("restart")
.description("Sends signal to restart the job if set to true")
Expand All @@ -373,18 +377,18 @@ private void initialize() {

}

private void createJobExecution(String name, BatchStatus status) {
private void createJobExecution(String name, BatchStatus status) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException {
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, new Date(), Collections.singletonList("--spring.cloud.data.flow.platformname=default"), null);
Map<String, JobParameter> jobParameterMap = new HashMap<>();
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), Collections.singletonList("--spring.cloud.data.flow.platformname=default"), null);
Map<String, JobParameter<?>> jobParameterMap = new HashMap<>();
JobParameters jobParameters = new JobParameters(jobParameterMap);
JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName());
JobExecution jobExecution = jobRepository.createJobExecution(jobRepository.createJobInstance(name, new JobParameters()), jobParameters, null);
JobExecution jobExecution = jobRepository.createJobExecution(name, jobParameters);
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
jobExecution.setStartTime(LocalDateTime.now());
jobRepository.update(jobExecution);
final TaskManifest manifest = new TaskManifest();
manifest.setPlatformName("default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package org.springframework.cloud.dataflow.server.rest.documentation;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -26,7 +26,10 @@
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport;
Expand All @@ -49,7 +52,7 @@
import static org.springframework.restdocs.payload.PayloadDocumentation.subsectionWithPath;
import static org.springframework.restdocs.request.RequestDocumentation.parameterWithName;
import static org.springframework.restdocs.request.RequestDocumentation.pathParameters;
import static org.springframework.restdocs.request.RequestDocumentation.requestParameters;
import static org.springframework.restdocs.request.RequestDocumentation.queryParameters;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -93,7 +96,7 @@ public void listJobInstances() throws Exception {
.param("size", "10"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
queryParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
Expand All @@ -117,7 +120,7 @@ public void jobDisplayDetail() throws Exception {
pathParameters(
parameterWithName("id").description("The id of an existing job instance (required)")
),
requestParameters(
queryParameters(
parameterWithName("schemaTarget").description("Schema target").optional()
),
responseFields(
Expand All @@ -138,16 +141,16 @@ private void initialize() {
this.taskBatchDaoContainer = context.getBean(TaskBatchDaoContainer.class);
}

private void createJobExecution(String name, BatchStatus status) {
private void createJobExecution(String name, BatchStatus status) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException {
SchemaVersionTarget schemaVersionTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(name, taskDefinitionReader);
TaskExecutionDao dao = this.daoContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = dao.createTaskExecution(name, new Date(), new ArrayList<>(), null);
TaskExecution taskExecution = dao.createTaskExecution(name, LocalDateTime.now(), new ArrayList<>(), null);
JobRepository jobRepository = this.jobRepositoryContainer.get(schemaVersionTarget.getName());
JobExecution jobExecution = jobRepository.createJobExecution(jobRepository.createJobInstance(name, new JobParameters()), new JobParameters(), null);
JobExecution jobExecution = jobRepository.createJobExecution(name, new JobParameters());
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersionTarget.getName());
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
jobExecution.setStartTime(LocalDateTime.now());
jobRepository.update(jobExecution);
}
}
Loading

0 comments on commit 089dadb

Please sign in to comment.