@@ -68,83 +68,6 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
6868 }, nil
6969}
7070
71- func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair {
72- // Create a new channel to receive new tasks
73- internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
74-
75- // Subscribe to new tasks
76- sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
77- if errMain != nil {
78- s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain))
79- }
80-
81- subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
82- if errFallback != nil {
83- s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback))
84- }
85-
86- if errMain != nil && errFallback != nil {
87- s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
88- return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
89- }
90-
91- s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
92-
93- pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
94-
95- // Forward the new tasks to the provided channel
96- go func() {
97- defer pollLatestBatchTicker.Stop()
98- newBatchMutex := &sync.Mutex{}
99- batchesSet := make(map[[32]byte]struct{})
100- for {
101- select {
102- case newBatch := <-internalChannel:
103- s.processNewBatchV2(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
104- case <-pollLatestBatchTicker.C:
105- latestBatch, err := s.getLatestNotRespondedTaskFromEthereumV2()
106- if err != nil {
107- s.logger.Debug("Failed to get latest task from blockchain", "err", err)
108- continue
109- }
110- if latestBatch != nil {
111- s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
112- }
113- }
114- }
115-
116- }()
117-
118- // Handle errors and resubscribe
119- go func() {
120- var errMain, errFallback error
121- var auxSub, auxSubFallback event.Subscription
122- for errMain == nil || errFallback == nil { //while one is active
123- select {
124- case err := <-sub.Err():
125- s.logger.Warn("Error in new task subscription of main connection", "err", err)
126-
127- auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
128- if errMain == nil {
129- sub = auxSub // update the subscription only if it was successful
130- s.logger.Info("Main connection resubscribed to new task subscription")
131- }
132- case err := <-subFallback.Err():
133- s.logger.Warn("Error in new task subscription of fallback connection", "err", err)
134-
135- auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
136- if errFallback == nil {
137- subFallback = auxSubFallback // update the subscription only if it was successful
138- s.logger.Info("Resubscribed to fallback new task subscription")
139- }
140- }
141- }
142- errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
143- }()
144-
145- return nil
146- }
147-
14871func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair {
14972 // Create a new channel to receive new tasks
15073 internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
@@ -224,32 +147,6 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
224147 return nil
225148}
226149
227- func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
228- newBatchMutex.Lock()
229- defer newBatchMutex.Unlock()
230-
231- batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
232- var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
233-
234- if _, ok := batchesSet[batchIdentifierHash]; !ok {
235- s.logger.Info("Received new task",
236- "batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
237- "senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
238- "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
239-
240- batchesSet[batchIdentifierHash] = struct{}{}
241- newTaskCreatedChan <- batch
242-
243- // Remove the batch from the set after RemoveBatchFromSetInterval time
244- go func() {
245- time.Sleep(RemoveBatchFromSetInterval)
246- newBatchMutex.Lock()
247- delete(batchesSet, batchIdentifierHash)
248- newBatchMutex.Unlock()
249- }()
250- }
251- }
252-
253150func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) {
254151 newBatchMutex.Lock()
255152 defer newBatchMutex.Unlock()
@@ -276,56 +173,6 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL
276173 }
277174}
278175
279- // getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
280- func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {
281-
282- latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
283- if err != nil {
284- return nil, err
285- }
286-
287- var fromBlock uint64
288-
289- if latestBlock < BlockInterval {
290- fromBlock = 0
291- } else {
292- fromBlock = latestBlock - BlockInterval
293- }
294-
295- logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams())
296- if err != nil {
297- return nil, err
298- }
299-
300- var lastLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
301-
302- // Iterate over the logs until the end
303- for logs.Next() {
304- lastLog = logs.Event
305- }
306-
307- if err := logs.Error(); err != nil {
308- return nil, err
309- }
310-
311- if lastLog == nil {
312- return nil, nil
313- }
314-
315- batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
316- batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
317- state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams())
318- if err != nil {
319- return nil, err
320- }
321-
322- if state.Responded {
323- return nil, nil
324- }
325-
326- return lastLog, nil
327- }
328-
329176// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
330177func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
331178 latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
0 commit comments