Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions byoc/job_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (bsg *BYOCGatewayServer) submitJob(ctx context.Context, w http.ResponseWrit
continue
}

gatewayBalance := updateGatewayBalance(bsg.node, orchToken, gatewayJob.Job.Req.Capability, time.Since(start))
gatewayBalance := updateGatewayBalance(bsg.node, orchToken, gatewayJob.Job.Req.ID, time.Since(start))
clog.V(common.SHORT).Infof(ctx, "Job processed successfully took=%v balance=%v balance_from_orch=%v", time.Since(start), gatewayBalance.FloatString(0), orchBalance)
w.Write(data)
return
Expand Down Expand Up @@ -186,7 +186,7 @@ func (bsg *BYOCGatewayServer) submitJob(ctx context.Context, w http.ResponseWrit
}
}

gatewayBalance := updateGatewayBalance(bsg.node, orchToken, gatewayJob.Job.Req.Capability, time.Since(start))
gatewayBalance := updateGatewayBalance(bsg.node, orchToken, gatewayJob.Job.Req.ID, time.Since(start))

clog.V(common.SHORT).Infof(ctx, "Job processed successfully took=%v balance=%v balance_from_orch=%v", time.Since(start), gatewayBalance.FloatString(0), orchBalance.FloatString(0))
}
Expand Down Expand Up @@ -468,14 +468,21 @@ func genOrchestratorReq(b common.Broadcaster) (*net.OrchestratorRequest, error)
return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: sig}, nil
}

func getToken(ctx context.Context, respTimeout time.Duration, orchUrl, capability, sender, senderSig string) (*JobToken, error) {
// getToken fetches a JobToken from an Orchestrator. If jobID is non-empty,
// the orchestrator will return the per-job balance keyed by that ID — used
// when refreshing tokens for an existing stream so the gateway can
// reconcile its local balance with the orchestrator's view.
func getToken(ctx context.Context, respTimeout time.Duration, orchUrl, capability, jobID, sender, senderSig string) (*JobToken, error) {
start := time.Now()
tokenReq, err := http.NewRequestWithContext(ctx, "GET", orchUrl+"/process/token", nil)
jobSender := JobSender{Addr: sender, Sig: senderSig}

reqSenderStr, _ := json.Marshal(jobSender)
tokenReq.Header.Set(jobEthAddressHdr, base64.StdEncoding.EncodeToString(reqSenderStr))
tokenReq.Header.Set(jobCapabilityHdr, capability)
if jobID != "" {
tokenReq.Header.Set(jobIdHdr, jobID)
}
if err != nil {
clog.Errorf(ctx, "Failed to create request for Orchestrator to verify job token request err=%v", err)
return nil, err
Expand Down
74 changes: 45 additions & 29 deletions byoc/job_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,19 @@ func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler {
return
}

capBal := orch.Balance(senderAddr, core.ManifestID(jobCapsHdr))
// Balances are keyed per-job. If the gateway is refreshing a token
// for an existing job/stream it sends Livepeer-Job-Id; we look up
// the per-job balance against that key. For initial token requests
// no job exists yet, so balance is 0.
jobIDHdr := r.Header.Get(jobIdHdr)
var capBal *big.Rat
if jobIDHdr != "" {
capBal = orch.Balance(senderAddr, core.ManifestID(jobIDHdr))
}
if capBal != nil {
capBal, err = common.PriceToInt64(capBal)
if err != nil {
clog.Errorf(context.TODO(), "could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error())
clog.Errorf(context.TODO(), "could not convert balance to int64 sender=%v job_id=%v err=%v", senderAddr.Hex(), jobIDHdr, err.Error())
capBal = big.NewRat(0, 1)
}
} else {
Expand All @@ -183,7 +191,7 @@ func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler {
//convert to int64. Note: returns with 000 more digits to allow for precision of 3 decimal places.
capBalInt, err := common.PriceToFixed(capBal)
if err != nil {
glog.Errorf("could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error())
glog.Errorf("could not convert balance to int64 sender=%v job_id=%v err=%v", senderAddr.Hex(), jobIDHdr, err.Error())
capBalInt = 0
} else {
// Remove the last three digits from capBalInt
Expand Down Expand Up @@ -291,8 +299,8 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
bso.orch.RemoveExternalCapability(orchJob.Req.Capability)
}

bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.Capability)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.ID)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
http.Error(w, fmt.Sprintf("job not able to be processed, removing capability err=%v", err.Error()), http.StatusInternalServerError)
return
}
Expand All @@ -301,8 +309,8 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
if resp.StatusCode == http.StatusUnauthorized {
clog.Errorf(ctx, "received 401 Unauthorized from worker, removing capability %v", orchJob.Req.Capability)
bso.orch.RemoveExternalCapability(orchJob.Req.Capability)
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.Capability)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.ID)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
http.Error(w, "job not able to be processed, removing capability err=worker auth token failed", http.StatusInternalServerError)
return
}
Expand All @@ -322,8 +330,8 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
if err != nil {
clog.Errorf(ctx, "Unable to read response err=%v", err)

bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.Capability)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.ID)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -332,16 +340,16 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
if resp.StatusCode > 399 {
clog.Errorf(ctx, "error processing request err=%v ", string(data))

bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.Capability)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.ID)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
//return error response from the worker
http.Error(w, string(data), resp.StatusCode)
return
}

bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.Capability)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
clog.V(common.SHORT).Infof(ctx, "Job processed successfully took=%v balance=%v", time.Since(start), bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.ID)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
clog.V(common.SHORT).Infof(ctx, "Job processed successfully took=%v balance=%v", time.Since(start), bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
w.Write(data)
//request completed and returned a response

Expand All @@ -354,15 +362,15 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
//send payment balance back so client can determine if payment is needed
bso.addPaymentBalanceHeader(w, orchJob.Sender, orchJob.Req.Capability)
bso.addPaymentBalanceHeader(w, orchJob.Sender, orchJob.Req.ID)

// Flush to ensure data is sent immediately
flusher, ok := w.(http.Flusher)
if !ok {
clog.Errorf(ctx, "streaming not supported")

bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.Capability)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
bso.chargeForCompute(start, orchJob.JobPrice, orchJob.Sender, orchJob.Req.ID)
w.Header().Set(jobPaymentBalanceHdr, bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
Expand All @@ -378,7 +386,7 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
for scanner.Scan() {
select {
case <-respCtx.Done():
orchBal := orch.Balance(orchJob.Sender, core.ManifestID(orchJob.Req.Capability))
orchBal := orch.Balance(orchJob.Sender, core.ManifestID(orchJob.Req.ID))
if orchBal == nil {
orchBal = big.NewRat(0, 1)
}
Expand All @@ -388,7 +396,7 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
default:
line := scanner.Text()
if strings.Contains(line, "[DONE]") {
orchBal := orch.Balance(orchJob.Sender, core.ManifestID(orchJob.Req.Capability))
orchBal := orch.Balance(orchJob.Sender, core.ManifestID(orchJob.Req.ID))
if orchBal == nil {
orchBal = big.NewRat(0, 1)
}
Expand All @@ -412,8 +420,8 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
//skips if price is 0
jobPriceRat := big.NewRat(orchJob.JobPrice.PricePerUnit, orchJob.JobPrice.PixelsPerUnit)
if jobPriceRat.Cmp(big.NewRat(0, 1)) > 0 {
bso.orch.DebitFees(orchJob.Sender, core.ManifestID(orchJob.Req.Capability), orchJob.JobPrice, 5)
senderBalance := bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability)
bso.orch.DebitFees(orchJob.Sender, core.ManifestID(orchJob.Req.ID), orchJob.JobPrice, 5)
senderBalance := bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID)
if senderBalance != nil {
if senderBalance.Cmp(big.NewRat(0, 1)) < 0 {
w.Write([]byte("event: insufficient balance\n"))
Expand All @@ -433,7 +441,7 @@ func (bso *BYOCOrchestratorServer) processJob(ctx context.Context, w http.Respon
}

//capacity released with defer stmt above
clog.V(common.SHORT).Infof(ctx, "Job processed successfully took=%v balance=%v", time.Since(start), bso.getPaymentBalance(orchJob.Sender, orchJob.Req.Capability).FloatString(0))
clog.V(common.SHORT).Infof(ctx, "Job processed successfully took=%v balance=%v", time.Since(start), bso.getPaymentBalance(orchJob.Sender, orchJob.Req.ID).FloatString(0))
}
}

Expand Down Expand Up @@ -461,7 +469,7 @@ func (bso *BYOCOrchestratorServer) setupOrchJob(ctx context.Context, r *http.Req
return nil, errors.New("Could not get job price")
}

pmtErr := bso.confirmPayment(ctx, sender, jobReq.Capability, jobPrice, r.Header.Get(jobPaymentHeaderHdr))
pmtErr := bso.confirmPayment(ctx, sender, jobReq.ID, jobReq.Capability, jobPrice, r.Header.Get(jobPaymentHeaderHdr))
if pmtErr != nil {
orch.FreeExternalCapabilityCapacity(jobReq.Capability)
return nil, pmtErr
Expand All @@ -478,7 +486,13 @@ func (bso *BYOCOrchestratorServer) setupOrchJob(ctx context.Context, r *http.Req
return &orchJob{Req: jobReq, Sender: sender, JobPrice: jobPrice, Details: &jobDetails}, nil
}

func (bso *BYOCOrchestratorServer) confirmPayment(ctx context.Context, sender ethcommon.Address, capability string, jobPrice *net.PriceInfo, paymentHdr string) error {
// confirmPayment validates and applies any payment present in the request,
// and verifies the sender has at least a minimum balance for the job.
//
// jobID is used as the ManifestID for balance keying — each job/stream gets
// an isolated balance bucket. capability is only used for capacity-slot
// management on payment failure (FreeExternalCapabilityCapacity).
func (bso *BYOCOrchestratorServer) confirmPayment(ctx context.Context, sender ethcommon.Address, jobID, capability string, jobPrice *net.PriceInfo, paymentHdr string) error {

clog.V(common.DEBUG).Infof(ctx, "job price=%v units=%v", jobPrice.PricePerUnit, jobPrice.PixelsPerUnit)

Expand All @@ -488,7 +502,7 @@ func (bso *BYOCOrchestratorServer) confirmPayment(ctx context.Context, sender et
if jobPriceRat.Cmp(big.NewRat(0, 1)) > 0 {
minBal := new(big.Rat).Mul(jobPriceRat, big.NewRat(60, 1)) //minimum 1 minute balance
//process payment if included
orchBal, pmtErr := bso.processPayment(ctx, sender, capability, paymentHdr)
orchBal, pmtErr := bso.processPayment(ctx, sender, jobID, capability, paymentHdr)
if pmtErr != nil {
//log if there are payment errors but continue, balance will runout and clean up
clog.Infof(ctx, "job payment error: %v", pmtErr)
Expand All @@ -502,22 +516,24 @@ func (bso *BYOCOrchestratorServer) confirmPayment(ctx context.Context, sender et
return nil
}

// process payment and return balance
func (bso *BYOCOrchestratorServer) processPayment(ctx context.Context, sender ethcommon.Address, capability string, paymentHdr string) (*big.Rat, error) {
// processPayment processes the ticket payment (if any) keyed by jobID and
// returns the resulting per-job balance. capability is used only to release
// the capacity slot on failure.
func (bso *BYOCOrchestratorServer) processPayment(ctx context.Context, sender ethcommon.Address, jobID, capability string, paymentHdr string) (*big.Rat, error) {
if paymentHdr != "" {
payment, err := getPayment(paymentHdr)
if err != nil {
clog.Errorf(ctx, "job payment invalid: %v", err)
return nil, errPaymentError
}

if err := bso.orch.ProcessPayment(ctx, payment, core.ManifestID(capability)); err != nil {
if err := bso.orch.ProcessPayment(ctx, payment, core.ManifestID(jobID)); err != nil {
bso.orch.FreeExternalCapabilityCapacity(capability)
clog.Errorf(ctx, "Error processing payment: %v", err)
return nil, errPaymentError
}
}
orchBal := bso.getPaymentBalance(sender, capability)
orchBal := bso.getPaymentBalance(sender, jobID)

return orchBal, nil

Expand Down
2 changes: 1 addition & 1 deletion byoc/job_orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func TestProcessPayment(t *testing.T) {
}

before := orch.Balance(sender, core.ManifestID(tc.capability)).FloatString(0)
bal, err := bso.processPayment(ctx, sender, tc.capability, testPmtHdr)
bal, err := bso.processPayment(ctx, sender, tc.capability, tc.capability, testPmtHdr)
after := orch.Balance(sender, core.ManifestID(tc.capability)).FloatString(0)
t.Logf("Balance before: %s, after: %s", before, after)
assert.NoError(t, err)
Expand Down
33 changes: 18 additions & 15 deletions byoc/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,21 @@ func (bsg *BYOCGatewayServer) createPayment(ctx context.Context, jobReq *JobRequ
orchAddr := ethcommon.BytesToAddress(orchToken.TicketParams.Recipient)
sessionID := bsg.node.Sender.StartSession(*pmTicketParams(orchToken.TicketParams))

//setup balances and update Gateway balance to Orchestrator balance, log differences
//Orchestrator tracks balance paid and will not perform work if the balance it
//has is not sufficient
// Balances are keyed per-job (ManifestID = jobReq.ID) so each job/stream
// gets an isolated balance bucket — no aggregation at the sender or
// capability level. This mirrors how live-video-to-video uses a fresh
// ManifestID per stream (see server/ai_process.go clearSessionBalance).
orchBal := big.NewRat(orchToken.Balance, 1)
price := big.NewRat(orchToken.Price.PricePerUnit, orchToken.Price.PixelsPerUnit)
cost := new(big.Rat).Mul(price, big.NewRat(int64(jobReq.Timeout), 1))
minBal := new(big.Rat).Mul(price, big.NewRat(120, 1)) //minimum 2 minute balance, Orchestrator requires 1 minute. Use 2 to have a buffer.
balance, diffToOrch, minBalCovered, resetToZero := compareAndUpdateBalance(bsg, orchAddr, jobReq.Capability, orchBal, minBal)
balance, diffToOrch, minBalCovered, resetToZero := compareAndUpdateBalance(bsg, orchAddr, jobReq.ID, orchBal, minBal)

if diffToOrch.Sign() != 0 {
clog.Infof(ctx, "Updated balance for sender=%v capability=%v by %v to match Orchestrator reported balance %v", sender.Hex(), jobReq.Capability, diffToOrch.FloatString(3), orchBal.FloatString(3))
clog.Infof(ctx, "Updated balance for sender=%v job_id=%v capability=%v by %v to match Orchestrator reported balance %v", sender.Hex(), jobReq.ID, jobReq.Capability, diffToOrch.FloatString(3), orchBal.FloatString(3))
}
if resetToZero {
clog.Infof(ctx, "Reset balance to zero for to match Orchestrator reported balance sender=%v capability=%v", sender.Hex(), jobReq.Capability)
clog.Infof(ctx, "Reset balance to zero to match Orchestrator reported balance sender=%v job_id=%v capability=%v", sender.Hex(), jobReq.ID, jobReq.Capability)
}
if minBalCovered {
createTickets = false
Expand All @@ -110,7 +111,7 @@ func (bsg *BYOCGatewayServer) createPayment(ctx context.Context, jobReq *JobRequ
ExpectedPrice: orchToken.Price,
}
}
clog.V(common.DEBUG).Infof(ctx, "current balance for sender=%v capability=%v is %v, cost=%v price=%v", sender.Hex(), jobReq.Capability, balance.FloatString(3), cost.FloatString(3), price.FloatString(3))
clog.V(common.DEBUG).Infof(ctx, "current balance for sender=%v job_id=%v capability=%v is %v, cost=%v price=%v", sender.Hex(), jobReq.ID, jobReq.Capability, balance.FloatString(3), cost.FloatString(3), price.FloatString(3))

if !createTickets {
clog.V(common.DEBUG).Infof(ctx, "No payment required, using balance=%v", balance.FloatString(3))
Expand All @@ -134,7 +135,7 @@ func (bsg *BYOCGatewayServer) createPayment(ctx context.Context, jobReq *JobRequ
fv := big.NewRat(tickets.FaceValue.Int64(), 1)
pmtTotal := new(big.Rat).Mul(fv, winProb)
pmtTotal = new(big.Rat).Mul(pmtTotal, big.NewRat(int64(ticketCnt), 1))
bsg.node.Balances.Credit(orchAddr, core.ManifestID(jobReq.Capability), pmtTotal)
bsg.node.Balances.Credit(orchAddr, core.ManifestID(jobReq.ID), pmtTotal)
//create the payment
payment = &net.Payment{
Sender: sender.Bytes(),
Expand All @@ -157,13 +158,14 @@ func (bsg *BYOCGatewayServer) createPayment(ctx context.Context, jobReq *JobRequ
payment.TicketSenderParams = senderParams

ratPrice, _ := common.RatPriceInfo(payment.ExpectedPrice)
balanceForOrch := bsg.node.Balances.Balance(orchAddr, core.ManifestID(jobReq.Capability))
balanceForOrch := bsg.node.Balances.Balance(orchAddr, core.ManifestID(jobReq.ID))
balanceForOrchStr := ""
if balanceForOrch != nil {
balanceForOrchStr = balanceForOrch.FloatString(3)
}

clog.V(common.DEBUG).Infof(ctx, "Created new payment - capability=%v recipient=%v faceValue=%v winProb=%v price=%v numTickets=%v balance=%v",
clog.V(common.DEBUG).Infof(ctx, "Created new payment - job_id=%v capability=%v recipient=%v faceValue=%v winProb=%v price=%v numTickets=%v balance=%v",
jobReq.ID,
jobReq.Capability,
tickets.Recipient.Hex(),
eth.FormatUnits(tickets.FaceValue, "ETH"),
Expand Down Expand Up @@ -202,15 +204,16 @@ func ticketCountForCost(cost *big.Rat, ticketEv *big.Rat, timeoutSeconds int64)
return int64(math.Max(0, math.Ceil(ticketCnt)))
}

func updateGatewayBalance(node *core.LivepeerNode, orchToken JobToken, capability string, took time.Duration) *big.Rat {
// updateGatewayBalance debits the gateway-side balance for a job by the cost
// of the elapsed time. The balance is keyed per-job (jobID is used as the
// ManifestID), giving each job its own isolated balance bucket.
func updateGatewayBalance(node *core.LivepeerNode, orchToken JobToken, jobID string, took time.Duration) *big.Rat {
orchAddr := ethcommon.BytesToAddress(orchToken.TicketParams.Recipient)
// update for usage of compute
orchPrice := big.NewRat(orchToken.Price.PricePerUnit, orchToken.Price.PixelsPerUnit)
cost := new(big.Rat).Mul(orchPrice, big.NewRat(int64(math.Ceil(took.Seconds())), 1))
node.Balances.Debit(orchAddr, core.ManifestID(capability), cost)
node.Balances.Debit(orchAddr, core.ManifestID(jobID), cost)

//get the updated balance
balance := node.Balances.Balance(orchAddr, core.ManifestID(capability))
balance := node.Balances.Balance(orchAddr, core.ManifestID(jobID))
if balance == nil {
return big.NewRat(0, 1)
}
Expand Down
Loading
Loading