Adding support of transient federates#358
Conversation
edwardalee
left a comment
There was a problem hiding this comment.
This looks like a great start, but I have some doubts about the concurrency handling as indicated in the comments. I will have to finish reviewing later, but I think there is enough to work with to go ahead and submit this provisional review.
|
In the "files changed" view, you can add suggestions to batch and commit them all at once. This way, we can avoid having lots of commit messages without a descriptive commit message. |
You probably want to use an interactive rebase to |
3725f73 to
798cb32
Compare
c29d4b8 to
b33b863
Compare
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
…t federates, cont.
… but did not started yet
… is an absent transient
…TAG to issue is the NET. This is to avoid starvation.
…p_connections_from_federates
…r the effective start tag and the port and ip adddress of the peer federate
…is_transient. Use tags instead of booleans
… time is already reached
…ective start tag, port and address
…start tag, otherwise it stays forever. This case happens whenever an outbount transient has connected when waiting for the timestamp
…TAG and is drained.
…to the ACK response to P2P_FED_ID message
…sient + keep the current tag of an outbound transient"
This PR replaces #192, and supports the latest major refactoring of the RTI.
It implements the transient feature in the federation execution. Details of the implementation are documented in Discussion lf-lang/lingua-franca#2212.
This should be merged with lf-lang/lingua-franca#2213
Merged #574
This PR builds on the centralized transient support from the #358 PR of
transient-fedbranch, extending thereactor-cruntime to support transient federates under decentralized coordination, where all connections are P2P and no RTI message forwarding occurs. It also adds support for physical connections involving transient federates under centralized coordination as well, which are P2P connections.lf-lang/lingua-franca#2609
Protocol extensions for decentralized transients
Connection
lf_connect_to_federate(), called when executing the preamble, is updated to accept anis_transientflag; transient outbounds are queried once (no retry loop) and connected only when the RTI sendsMSG_TYPE_OUTBOUND_CONNECTED. Non-transient outbounds still use the original pattern and keep retrying everyADDRESS_QUERY_RETRY_INTERVALuntil connected.outbound_transientsarray and updatesnumber_of_outbound_transientsaccordingly.MSG_TYPE_OUTBOUND_CONNECTEDandMSG_TYPE_OUTBOUND_DISCONNECTEDare new messages. The RTI sends these to the federate whose downstream peers have connected or disconnected. When an outbound transient connects, the federate queries its address from the RTI. When it disconnects, the federate skips sending messages, thus avoiding an error writing to a broken pipe.MSG_TYPE_ADDRESS_QUERYis extended with anis_transientbyte so the RTI can register the querying federate's outbound-transient relationships.outbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES]andinbound_p2p_connection_is_transient[NUMBER_OF_FEDERATES]are added tofederate_instance_t.Adaptations
mark_inputs_known_absent(): for inbound transients, the tag is set toenv->current_taginstead ofFOREVER_TAGwhen the P2P socket closes.FOREVER_TAGpermanently blocks ports from being updated after a transient rejoins, causing spurious Attempt to update to earlier tag errors and outbound STP violations.env->current_tagis sufficient to unblock the scheduler while leaving the port open to future updates.get_start_time_from_rti():MSG_TYPE_OUTBOUND_CONNECTEDarriving during the start-time handshake is now deferred — the federate ID is drained from the socket immediately, butlf_connect_to_federate()(which itself reads from the RTI socket) is called only afterMSG_TYPE_TIMESTAMPis received. This eliminates a race condition where the address-query reply consumed the timestamp bytes, leading to Unexpected reply of type 2.notify_federate_disconnected(): now callssend_outbound_disconnected_locked()in addition to the existingsend_upstream_disconnected_locked()calls, so inbound federates of a departing transient close their outbound P2P sockets.WARNING: Failed to accept the socket. Invalid argumentthat fired becauseaccept()returnsEINVALwhen unblocked by the intentionalshutdown_socket()call at end of execution.Tracing and visualization (
fedsd)send_OUTBOUND_CONNECTED/receive_OUTBOUND_CONNECTEDandsend_OUTBOUND_DISCONNECTED/receive_OUTBOUND_DISCONNECTEDtrace events are added totrace_types.handfedsd.py.P2P_MSGarrows now connect sender and receiver correctly infedsd: matching is done by physical-time ordering rather thanpartner_id(which is-1on both sides for direct P2P tracepoints).