Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ service JobService {

// Get ingestion health metrics for a Feature Table
rpc GetHealthMetrics (GetHealthMetricsRequest) returns (GetHealthMetricsResponse);

// List batch ingestion job record for a Feature Table
rpc ListBatchJobRecords(ListBatchJobRecordsRequest) returns (ListBatchJobRecordsResponse);
}


Expand Down Expand Up @@ -116,6 +119,29 @@ message Job {
string project = 11;
}

message BatchJobRecord {
string id = 1;
string job_id = 2;
// Type of the Job
JobType type = 3;
// Current job status
JobStatus status = 4;
google.protobuf.Timestamp job_start_time = 5;
google.protobuf.Timestamp job_end_time = 6;
message OfflineToOnlineMeta {
string table_name = 1;
string project = 2;
google.protobuf.Timestamp start_time_param = 3;
google.protobuf.Timestamp end_time_param = 4;
}
// JobType specific metadata on the job
oneof meta {
OfflineToOnlineMeta batch_ingestion = 7;
// TODO: add RetrievalJobMeta
}
string spark_app_manifest = 8;
}

// Ingest data from offline store into online store
message StartOfflineToOnlineIngestionJobRequest {
// Feature table to ingest
Expand Down Expand Up @@ -269,3 +295,24 @@ message GetHealthMetricsResponse {
repeated string passed = 1;
repeated string failed = 2;
}

message ListBatchJobRecordsRequest {
// Project name
string project = 1;

// Feature table name, required for JOB_TYPE=BATCH_INGESTION_JOB
string table_name = 2;

// optional field
JobType type = 3;

// Optional, Time range for the records to be listed. Defaults to preconfigured range
google.protobuf.Timestamp end = 4;

// Optional, defaults to current time
google.protobuf.Timestamp start = 5;
}

message ListBatchJobRecordsResponse {
repeated BatchJobRecord records = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
@EnableAsync
public class CaraMLRegistry {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JobGrpcServiceImpl extends JobServiceGrpc.JobServiceImplBase {
@Autowired
public JobGrpcServiceImpl(JobService jobService) {
this.jobService = jobService;
this.jobService.startWatcher();
}

@Override
Expand Down Expand Up @@ -158,4 +159,21 @@ public void unscheduleJob(
responseObserver.onNext(UnscheduleJobResponse.getDefaultInstance());
responseObserver.onCompleted();
}

@Override
public void listBatchJobRecords(
JobServiceProto.ListBatchJobRecordsRequest request,
StreamObserver<JobServiceProto.ListBatchJobRecordsResponse> responseObserver) {
List<JobServiceProto.BatchJobRecord> records =
jobService.listBatchJobRecords(
request.getProject(),
request.getType(),
request.getTableName(),
request.getStart().getSeconds(),
request.getEnd().getSeconds());
JobServiceProto.ListBatchJobRecordsResponse response =
JobServiceProto.ListBatchJobRecordsResponse.newBuilder().addAllRecords(records).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import dev.caraml.store.protobuf.core.FeatureProto;
import dev.caraml.store.protobuf.core.FeatureTableProto;
import dev.caraml.store.protobuf.core.FeatureTableProto.FeatureTableSpec;
import dev.caraml.store.sparkjob.BatchJobRecord;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -111,6 +112,9 @@ public class FeatureTable extends AbstractTimestampEntity {
@JoinColumn(name = "online_store_name")
private OnlineStore onlineStore;

@OneToMany(fetch = FetchType.LAZY, mappedBy = "featureTable")
private Set<BatchJobRecord> batchJobRecords;

public FeatureTable() {}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package dev.caraml.store.sparkjob;

import dev.caraml.store.feature.FeatureTable;
import dev.caraml.store.protobuf.jobservice.JobServiceProto;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.JobType;
import dev.caraml.store.sparkjob.crd.SparkApplication;
import java.sql.Timestamp;
import java.util.List;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.ManyToOne;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.tuple.Pair;

@Getter
@Entity
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Table(name = "batch_job_records")
public class BatchJobRecord {

@Column(name = "id")
@Id
private String id;

private String ingestionJobId;

@Column(name = "job_type", nullable = false)
// The type of the ingestion job (e.g., BATCH, RETRIEVAL)
private JobType jobType;

// The project associated with the ingestion job
@Column(name = "project", nullable = false)
private String project;

// The feature table associated with the ingestion job
@ManyToOne(optional = true)
private FeatureTable featureTable;

// The status of the ingestion job (e.g., RUNNING, COMPLETED, FAILED)
@Column(name = "status")
private String status;

// The start time of the ingestion job
@Column(name = "job_end_time")
private long jobEndTime;

// The end time of the ingestion job
@Column(name = "job_start_time")
private long jobStartTime;

// start parameters passed into spark job
@Column(name = "start_time")
private Timestamp ingestionJobStartTimeParam;

// end parameters passed into spark job
@Column(name = "end_time")
private Timestamp ingestionJobEndTimeParam;

// // Any error message associated with the ingestion job
// private String errorMessage;

@Column(name = "spark_application_manifest", columnDefinition = "text")
private String sparkApplicationManifest;

public static Pair<Timestamp, Timestamp> getStartEndTimeParamsFromSparkApplication(
SparkApplication sparkApplication) {

/*
* abstract
*
* - --start
* - "2025-05-17T00:00:00Z"
* - --end
* - "2025-05-18T00:00:00Z"
*/
List<String> input = sparkApplication.getSpec().getArguments();
String startTime = null;
String endTime = null;
String ingestionTimespan = null;

for (int i = 0; i < input.size() - 1; i++) {
if (input.get(i).equals("--start")) {
startTime = input.get(i + 1);
} else if (input.get(i).equals("--end")) {
endTime = input.get(i + 1);
} else if (input.get(i).equals("--ingestion-timespan")) {
ingestionTimespan = input.get(i + 1);
}
}
if (startTime == null || endTime == null) {
// check for ingestion timespan
// TODO: decide how to handle this case
if (ingestionTimespan != null) {
// If ingestion timespan is provided, we can derive start and end time
// For example, if ingestionTimespan is "1d", we can set startTime to 1 day ago
// and endTime to now.
long currentTimeMillis = System.currentTimeMillis();
long ingestionTimespanMillis =
1000
* 24
* 60
* 60
* Long.parseLong(ingestionTimespan); // integerTimespan in milliseconds
startTime = new Timestamp(currentTimeMillis - ingestionTimespanMillis).toString();
endTime = new Timestamp(currentTimeMillis).toString();
} else {
throw new IllegalArgumentException("Start and end time parameters are missing.");
}
}
return Pair.of(
Timestamp.valueOf(startTime.replace("T", " ").replace("Z", "")),
Timestamp.valueOf(endTime.replace("T", " ").replace("Z", "")));
}

public static JobServiceProto.BatchJobRecord toProto(BatchJobRecord record) {
if (record.getJobType() == JobType.BATCH_INGESTION_JOB) {
JobServiceProto.BatchJobRecord.OfflineToOnlineMeta meta =
JobServiceProto.BatchJobRecord.OfflineToOnlineMeta.newBuilder()
.setTableName(record.getFeatureTable().getName())
.setProject(record.getProject())
.setStartTimeParam(
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(record.getIngestionJobStartTimeParam().getTime() / 1000)
.build())
.setEndTimeParam(
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(record.getIngestionJobEndTimeParam().getTime() / 1000)
.build())
.build();
return JobServiceProto.BatchJobRecord.newBuilder()
.setId(record.getId())
.setJobId(record.getIngestionJobId())
.setType(record.getJobType())
.setBatchIngestion(meta)
.setStatus(JobServiceProto.JobStatus.valueOf(record.getStatus()))
.setJobStartTime(
com.google.protobuf.Timestamp.newBuilder().setSeconds(record.jobStartTime).build())
.setJobEndTime(
com.google.protobuf.Timestamp.newBuilder().setSeconds(record.jobEndTime).build())
.setSparkAppManifest(record.getSparkApplicationManifest())
.build();
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dev.caraml.store.sparkjob;

import dev.caraml.store.feature.FeatureTable;
import dev.caraml.store.protobuf.jobservice.JobServiceProto;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;

public interface BatchJobRecordRepository extends JpaRepository<BatchJobRecord, String> {
List<BatchJobRecord>
findAllByJobTypeAndProjectAndFeatureTableAndJobStartTimeBetweenOrderByJobStartTimeDesc(
JobServiceProto.JobType jobType,
String project,
FeatureTable featureTable,
long startTime,
long endTime);
}
Loading
Loading