Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::ClusterIds;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::NonDet;
26#[cfg(feature = "sim")]
27use crate::sim::SimReceiver;
28use crate::staging_util::get_this_crate;
29
30// same as the one in `hydro_std`, but internal use only
31fn track_membership<'a, C, L: Location<'a> + NoTick>(
32    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
33) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
34    membership.fold(
35        q!(|| false),
36        q!(|present, event| {
37            match event {
38                MembershipEvent::Joined => *present = true,
39                MembershipEvent::Left => *present = false,
40            }
41        }),
42    )
43}
44
45fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
46    let root = get_this_crate();
47
48    if is_demux {
49        parse_quote! {
50            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
51                |(id, data)| {
52                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
53                }
54            )
55        }
56    } else {
57        parse_quote! {
58            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
59                |data| {
60                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
61                }
62            )
63        }
64    }
65}
66
67pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
68    serialize_bincode_with_type(is_demux, &quote_type::<T>())
69}
70
71fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
72    let root = get_this_crate();
73    if let Some(c_type) = tagged {
74        parse_quote! {
75            |res| {
76                let (id, b) = res.unwrap();
77                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
78            }
79        }
80    } else {
81        parse_quote! {
82            |res| {
83                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
84            }
85        }
86    }
87}
88
89pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
90    deserialize_bincode_with_type(tagged, &quote_type::<T>())
91}
92
93impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
94    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
95    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
96    /// using [`bincode`] to serialize/deserialize messages.
97    ///
98    /// The returned stream captures the elements received at the destination, where values will
99    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
100    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
101    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
102    /// dropped no further messages will be sent.
103    ///
104    /// # Example
105    /// ```rust
106    /// # #[cfg(feature = "deploy")] {
107    /// # use hydro_lang::prelude::*;
108    /// # use futures::StreamExt;
109    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
110    /// let p1 = flow.process::<()>();
111    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
112    /// let p2 = flow.process::<()>();
113    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
114    /// // 1, 2, 3
115    /// # on_p2.send_bincode(&p_out)
116    /// # }, |mut stream| async move {
117    /// # for w in 1..=3 {
118    /// #     assert_eq!(stream.next().await, Some(w));
119    /// # }
120    /// # }));
121    /// # }
122    /// ```
123    pub fn send_bincode<L2>(
124        self,
125        other: &Process<'a, L2>,
126    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
127    where
128        T: Serialize + DeserializeOwned,
129    {
130        self.send(other, TCP.fail_stop().bincode())
131    }
132
133    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
134    /// using the configuration in `via` to set up the message transport.
135    ///
136    /// The returned stream captures the elements received at the destination, where values will
137    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
138    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
139    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
140    /// dropped no further messages will be sent.
141    ///
142    /// # Example
143    /// ```rust
144    /// # #[cfg(feature = "deploy")] {
145    /// # use hydro_lang::prelude::*;
146    /// # use futures::StreamExt;
147    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
148    /// let p1 = flow.process::<()>();
149    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
150    /// let p2 = flow.process::<()>();
151    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
152    /// // 1, 2, 3
153    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
154    /// # }, |mut stream| async move {
155    /// # for w in 1..=3 {
156    /// #     assert_eq!(stream.next().await, Some(w));
157    /// # }
158    /// # }));
159    /// # }
160    /// ```
161    pub fn send<L2, N: NetworkFor<T>>(
162        self,
163        to: &Process<'a, L2>,
164        via: N,
165    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
166    where
167        T: Serialize + DeserializeOwned,
168        O: MinOrder<N::OrderingGuarantee>,
169    {
170        let serialize_pipeline = Some(N::serialize_thunk(false));
171        let deserialize_pipeline = Some(N::deserialize_thunk(None));
172
173        let name = via.name();
174        if to.multiversioned() && name.is_none() {
175            panic!(
176                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
177            );
178        }
179
180        Stream::new(
181            to.clone(),
182            HydroNode::Network {
183                name: name.map(ToOwned::to_owned),
184                networking_info: N::networking_info(),
185                serialize_fn: serialize_pipeline.map(|e| e.into()),
186                instantiate_fn: DebugInstantiate::Building,
187                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
188                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
189                metadata: to.new_node_metadata(Stream::<
190                    T,
191                    Process<'a, L2>,
192                    Unbounded,
193                    <O as MinOrder<N::OrderingGuarantee>>::Min,
194                    R,
195                >::collection_kind()),
196            },
197        )
198    }
199
200    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
201    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
202    /// using [`bincode`] to serialize/deserialize messages.
203    ///
204    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
205    /// membership information. This is a common pattern in distributed systems for broadcasting data to
206    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
207    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
208    /// each element to all cluster members.
209    ///
210    /// # Non-Determinism
211    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
212    /// to the current cluster members _at that point in time_. Depending on when we are notified of
213    /// membership changes, we will broadcast each element to different members.
214    ///
215    /// # Example
216    /// ```rust
217    /// # #[cfg(feature = "deploy")] {
218    /// # use hydro_lang::prelude::*;
219    /// # use futures::StreamExt;
220    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
221    /// let p1 = flow.process::<()>();
222    /// let workers: Cluster<()> = flow.cluster::<()>();
223    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
224    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
225    /// # on_worker.send_bincode(&p2).entries()
226    /// // if there are 4 members in the cluster, each receives one element
227    /// // - MemberId::<()>(0): [123]
228    /// // - MemberId::<()>(1): [123]
229    /// // - MemberId::<()>(2): [123]
230    /// // - MemberId::<()>(3): [123]
231    /// # }, |mut stream| async move {
232    /// # let mut results = Vec::new();
233    /// # for w in 0..4 {
234    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
235    /// # }
236    /// # results.sort();
237    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
238    /// # }));
239    /// # }
240    /// ```
241    pub fn broadcast_bincode<L2: 'a>(
242        self,
243        other: &Cluster<'a, L2>,
244        nondet_membership: NonDet,
245    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
246    where
247        T: Clone + Serialize + DeserializeOwned,
248    {
249        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
250    }
251
252    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
253    /// using the configuration in `via` to set up the message transport.
254    ///
255    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
256    /// membership information. This is a common pattern in distributed systems for broadcasting data to
257    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
258    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
259    /// each element to all cluster members.
260    ///
261    /// # Non-Determinism
262    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
263    /// to the current cluster members _at that point in time_. Depending on when we are notified of
264    /// membership changes, we will broadcast each element to different members.
265    ///
266    /// # Example
267    /// ```rust
268    /// # #[cfg(feature = "deploy")] {
269    /// # use hydro_lang::prelude::*;
270    /// # use futures::StreamExt;
271    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
272    /// let p1 = flow.process::<()>();
273    /// let workers: Cluster<()> = flow.cluster::<()>();
274    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
275    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
276    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
277    /// // if there are 4 members in the cluster, each receives one element
278    /// // - MemberId::<()>(0): [123]
279    /// // - MemberId::<()>(1): [123]
280    /// // - MemberId::<()>(2): [123]
281    /// // - MemberId::<()>(3): [123]
282    /// # }, |mut stream| async move {
283    /// # let mut results = Vec::new();
284    /// # for w in 0..4 {
285    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
286    /// # }
287    /// # results.sort();
288    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
289    /// # }));
290    /// # }
291    /// ```
292    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
293        self,
294        to: &Cluster<'a, L2>,
295        via: N,
296        nondet_membership: NonDet,
297    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
298    where
299        T: Clone + Serialize + DeserializeOwned,
300        O: MinOrder<N::OrderingGuarantee>,
301    {
302        let ids = track_membership(self.location.source_cluster_members(to));
303        sliced! {
304            let members_snapshot = use(ids, nondet_membership);
305            let elements = use(self, nondet_membership);
306
307            let current_members = members_snapshot.filter(q!(|b| *b));
308            elements.repeat_with_keys(current_members)
309        }
310        .demux(to, via)
311    }
312
313    /// Broadcasts elements of this stream to all members of a cluster,
314    /// assuming membership is closed (fixed at deploy time).
315    ///
316    /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
317    /// The membership set is obtained from deploy metadata via
318    /// [`ClusterIds`], producing a
319    /// `Bounded` stream. The cross-product of data × members is fully
320    /// deterministic.
321    ///
322    /// This is only available in deployment targets with static cluster
323    /// membership (legacy Hydro Deploy and simulation). There are no late
324    /// joiners in that context, so broadcast receivers are guaranteed to
325    /// get data from the start of the stream. On dynamic targets
326    /// (e.g. ECS), use [`Stream::broadcast`] instead.
327    ///
328    /// # Example
329    /// ```rust
330    /// # #[cfg(feature = "deploy")] {
331    /// # use hydro_lang::prelude::*;
332    /// # use futures::StreamExt;
333    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
334    /// let p1 = flow.process::<()>();
335    /// let workers: Cluster<()> = flow.cluster::<()>();
336    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
337    /// let on_worker = numbers.broadcast_closed(&workers, TCP.fail_stop().bincode());
338    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
339    /// // each of the 4 cluster members receives 123
340    /// # }, |mut stream| async move {
341    /// # let mut results = Vec::new();
342    /// # for _ in 0..4 {
343    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
344    /// # }
345    /// # results.sort();
346    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
347    /// # }));
348    /// # }
349    /// ```
350    pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
351        self,
352        to: &Cluster<'a, L2>,
353        via: N,
354    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
355    where
356        T: Clone + Serialize + DeserializeOwned,
357        O: MinOrder<N::OrderingGuarantee>,
358    {
359        let cluster_ids = ClusterIds {
360            key: to.key,
361            _phantom: PhantomData,
362        };
363        let member_ids = self.location.source_iter(q!(cluster_ids
364            .iter()
365            .map(|id| MemberId::from_tagless(id.clone()))));
366
367        // Late joiners will receive no data from this broadcast, which is
368        // future-monotone and eventually consistent (a safe under-approximation).
369        self.cross_product(member_ids.weaken_retries())
370            .map(q!(|(data, member_id)| (member_id, data)))
371            .into_keyed()
372            .demux(to, via)
373    }
374
375    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
376    /// serialization. The external process can receive these elements by establishing a TCP
377    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
378    ///
379    /// # Example
380    /// ```rust
381    /// # #[cfg(feature = "deploy")] {
382    /// # use hydro_lang::prelude::*;
383    /// # use futures::StreamExt;
384    /// # tokio_test::block_on(async move {
385    /// let mut flow = FlowBuilder::new();
386    /// let process = flow.process::<()>();
387    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
388    /// let external = flow.external::<()>();
389    /// let external_handle = numbers.send_bincode_external(&external);
390    ///
391    /// let mut deployment = hydro_deploy::Deployment::new();
392    /// let nodes = flow
393    ///     .with_process(&process, deployment.Localhost())
394    ///     .with_external(&external, deployment.Localhost())
395    ///     .deploy(&mut deployment);
396    ///
397    /// deployment.deploy().await.unwrap();
398    /// // establish the TCP connection
399    /// let mut external_recv_stream = nodes.connect(external_handle).await;
400    /// deployment.start().await.unwrap();
401    ///
402    /// for w in 1..=3 {
403    ///     assert_eq!(external_recv_stream.next().await, Some(w));
404    /// }
405    /// # });
406    /// # }
407    /// ```
408    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
409    where
410        T: Serialize + DeserializeOwned,
411    {
412        let serialize_pipeline = Some(serialize_bincode::<T>(false));
413
414        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
415
416        let external_port_id = flow_state_borrow.next_external_port();
417
418        flow_state_borrow.push_root(HydroRoot::SendExternal {
419            to_external_key: other.key,
420            to_port_id: external_port_id,
421            to_many: false,
422            unpaired: true,
423            serialize_fn: serialize_pipeline.map(|e| e.into()),
424            instantiate_fn: DebugInstantiate::Building,
425            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
426            op_metadata: HydroIrOpMetadata::new(),
427        });
428
429        ExternalBincodeStream {
430            process_key: other.key,
431            port_id: external_port_id,
432            _phantom: PhantomData,
433        }
434    }
435
436    #[cfg(feature = "sim")]
437    /// Sets up a simulation output port for this stream, allowing test code to receive elements
438    /// sent to this stream during simulation.
439    pub fn sim_output(self) -> SimReceiver<T, O, R>
440    where
441        T: Serialize + DeserializeOwned,
442    {
443        let external_location: External<'a, ()> = External {
444            key: LocationKey::FIRST,
445            flow_state: self.location.flow_state().clone(),
446            _phantom: PhantomData,
447        };
448
449        let external = self.send_bincode_external(&external_location);
450
451        SimReceiver(external.port_id, PhantomData)
452    }
453}
454
455impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
456    /// Creates an external output for embedded deployment mode.
457    ///
458    /// The `name` parameter specifies the name of the field in the generated
459    /// `EmbeddedOutputs` struct that will receive elements from this stream.
460    /// The generated function will accept an `EmbeddedOutputs` struct with an
461    /// `impl FnMut(T)` field with this name.
462    pub fn embedded_output(self, name: impl Into<String>) {
463        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
464
465        self.location
466            .flow_state()
467            .borrow_mut()
468            .push_root(HydroRoot::EmbeddedOutput {
469                ident,
470                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
471                op_metadata: HydroIrOpMetadata::new(),
472            });
473    }
474}
475
476impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
477    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
478{
479    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
480    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
481    /// using [`bincode`] to serialize/deserialize messages.
482    ///
483    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
484    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
485    /// this API allows precise targeting of specific cluster members rather than broadcasting to
486    /// all members.
487    ///
488    /// # Example
489    /// ```rust
490    /// # #[cfg(feature = "deploy")] {
491    /// # use hydro_lang::prelude::*;
492    /// # use futures::StreamExt;
493    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
494    /// let p1 = flow.process::<()>();
495    /// let workers: Cluster<()> = flow.cluster::<()>();
496    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
497    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
498    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
499    ///     .demux_bincode(&workers);
500    /// # on_worker.send_bincode(&p2).entries()
501    /// // if there are 4 members in the cluster, each receives one element
502    /// // - MemberId::<()>(0): [0]
503    /// // - MemberId::<()>(1): [1]
504    /// // - MemberId::<()>(2): [2]
505    /// // - MemberId::<()>(3): [3]
506    /// # }, |mut stream| async move {
507    /// # let mut results = Vec::new();
508    /// # for w in 0..4 {
509    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
510    /// # }
511    /// # results.sort();
512    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
513    /// # }));
514    /// # }
515    /// ```
516    pub fn demux_bincode(
517        self,
518        other: &Cluster<'a, L2>,
519    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
520    where
521        T: Serialize + DeserializeOwned,
522    {
523        self.demux(other, TCP.fail_stop().bincode())
524    }
525
526    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
527    /// using the configuration in `via` to set up the message transport.
528    ///
529    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
530    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
531    /// this API allows precise targeting of specific cluster members rather than broadcasting to
532    /// all members.
533    ///
534    /// # Example
535    /// ```rust
536    /// # #[cfg(feature = "deploy")] {
537    /// # use hydro_lang::prelude::*;
538    /// # use futures::StreamExt;
539    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
540    /// let p1 = flow.process::<()>();
541    /// let workers: Cluster<()> = flow.cluster::<()>();
542    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
543    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
544    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
545    ///     .demux(&workers, TCP.fail_stop().bincode());
546    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
547    /// // if there are 4 members in the cluster, each receives one element
548    /// // - MemberId::<()>(0): [0]
549    /// // - MemberId::<()>(1): [1]
550    /// // - MemberId::<()>(2): [2]
551    /// // - MemberId::<()>(3): [3]
552    /// # }, |mut stream| async move {
553    /// # let mut results = Vec::new();
554    /// # for w in 0..4 {
555    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
556    /// # }
557    /// # results.sort();
558    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
559    /// # }));
560    /// # }
561    /// ```
562    pub fn demux<N: NetworkFor<T>>(
563        self,
564        to: &Cluster<'a, L2>,
565        via: N,
566    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
567    where
568        T: Serialize + DeserializeOwned,
569        O: MinOrder<N::OrderingGuarantee>,
570    {
571        self.into_keyed().demux(to, via)
572    }
573}
574
575impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
576    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
577    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
578    /// [`bincode`] to serialize/deserialize messages.
579    ///
580    /// This provides load balancing by evenly distributing work across cluster members. The
581    /// distribution is deterministic based on element order - the first element goes to member 0,
582    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
583    ///
584    /// # Non-Determinism
585    /// The set of cluster members may asynchronously change over time. Each element is distributed
586    /// based on the current cluster membership _at that point in time_. Depending on when cluster
587    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
588    /// membership is stable, the order of members in the round-robin pattern may change across runs.
589    ///
590    /// # Ordering Requirements
591    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
592    /// order of messages and retries affects the round-robin pattern.
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::prelude::*;
598    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
599    /// # use futures::StreamExt;
600    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
601    /// let p1 = flow.process::<()>();
602    /// let workers: Cluster<()> = flow.cluster::<()>();
603    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
604    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
605    /// on_worker.send_bincode(&p2)
606    /// # .first().values() // we use first to assert that each member gets one element
607    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
608    /// // - MemberId::<()>(?): [1]
609    /// // - MemberId::<()>(?): [2]
610    /// // - MemberId::<()>(?): [3]
611    /// // - MemberId::<()>(?): [4]
612    /// # }, |mut stream| async move {
613    /// # let mut results = Vec::new();
614    /// # for w in 0..4 {
615    /// #     results.push(stream.next().await.unwrap());
616    /// # }
617    /// # results.sort();
618    /// # assert_eq!(results, vec![1, 2, 3, 4]);
619    /// # }));
620    /// # }
621    /// ```
622    pub fn round_robin_bincode<L2: 'a>(
623        self,
624        other: &Cluster<'a, L2>,
625        nondet_membership: NonDet,
626    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
627    where
628        T: Serialize + DeserializeOwned,
629    {
630        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
631    }
632
633    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
634    /// the configuration in `via` to set up the message transport.
635    ///
636    /// This provides load balancing by evenly distributing work across cluster members. The
637    /// distribution is deterministic based on element order - the first element goes to member 0,
638    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
639    ///
640    /// # Non-Determinism
641    /// The set of cluster members may asynchronously change over time. Each element is distributed
642    /// based on the current cluster membership _at that point in time_. Depending on when cluster
643    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
644    /// membership is stable, the order of members in the round-robin pattern may change across runs.
645    ///
646    /// # Ordering Requirements
647    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
648    /// order of messages and retries affects the round-robin pattern.
649    ///
650    /// # Example
651    /// ```rust
652    /// # #[cfg(feature = "deploy")] {
653    /// # use hydro_lang::prelude::*;
654    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
655    /// # use futures::StreamExt;
656    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
657    /// let p1 = flow.process::<()>();
658    /// let workers: Cluster<()> = flow.cluster::<()>();
659    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
660    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
661    /// on_worker.send(&p2, TCP.fail_stop().bincode())
662    /// # .first().values() // we use first to assert that each member gets one element
663    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
664    /// // - MemberId::<()>(?): [1]
665    /// // - MemberId::<()>(?): [2]
666    /// // - MemberId::<()>(?): [3]
667    /// // - MemberId::<()>(?): [4]
668    /// # }, |mut stream| async move {
669    /// # let mut results = Vec::new();
670    /// # for w in 0..4 {
671    /// #     results.push(stream.next().await.unwrap());
672    /// # }
673    /// # results.sort();
674    /// # assert_eq!(results, vec![1, 2, 3, 4]);
675    /// # }));
676    /// # }
677    /// ```
678    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
679        self,
680        to: &Cluster<'a, L2>,
681        via: N,
682        nondet_membership: NonDet,
683    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
684    where
685        T: Serialize + DeserializeOwned,
686    {
687        let ids = track_membership(self.location.source_cluster_members(to));
688        sliced! {
689            let members_snapshot = use(ids, nondet_membership);
690            let elements = use(self.enumerate(), nondet_membership);
691
692            let current_members = members_snapshot
693                .filter(q!(|b| *b))
694                .keys()
695                .assume_ordering::<TotalOrder>(nondet_membership)
696                .collect_vec();
697
698            elements
699                .cross_singleton(current_members)
700                .filter_map(q!(|(data, members)| {
701                    if members.is_empty() {
702                        None
703                    } else {
704                        Some((members[data.0 % members.len()].clone(), data.1))
705                    }
706                }))
707        }
708        .demux(to, via)
709    }
710}
711
712impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
713    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
714    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
715    /// [`bincode`] to serialize/deserialize messages.
716    ///
717    /// This provides load balancing by evenly distributing work across cluster members. The
718    /// distribution is deterministic based on element order - the first element goes to member 0,
719    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
720    ///
721    /// # Non-Determinism
722    /// The set of cluster members may asynchronously change over time. Each element is distributed
723    /// based on the current cluster membership _at that point in time_. Depending on when cluster
724    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
725    /// membership is stable, the order of members in the round-robin pattern may change across runs.
726    ///
727    /// # Ordering Requirements
728    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
729    /// order of messages and retries affects the round-robin pattern.
730    ///
731    /// # Example
732    /// ```rust
733    /// # #[cfg(feature = "deploy")] {
734    /// # use hydro_lang::prelude::*;
735    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
736    /// # use hydro_lang::location::MemberId;
737    /// # use futures::StreamExt;
738    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
739    /// let p1 = flow.process::<()>();
740    /// let workers1: Cluster<()> = flow.cluster::<()>();
741    /// let workers2: Cluster<()> = flow.cluster::<()>();
742    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
743    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
744    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
745    /// on_worker2.send_bincode(&p2)
746    /// # .entries()
747    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
748    /// # }, |mut stream| async move {
749    /// # let mut results = Vec::new();
750    /// # let mut locations = std::collections::HashSet::new();
751    /// # for w in 0..=16 {
752    /// #     let (location, v) = stream.next().await.unwrap();
753    /// #     locations.insert(location);
754    /// #     results.push(v);
755    /// # }
756    /// # results.sort();
757    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
758    /// # assert_eq!(locations.len(), 16);
759    /// # }));
760    /// # }
761    /// ```
762    pub fn round_robin_bincode<L2: 'a>(
763        self,
764        other: &Cluster<'a, L2>,
765        nondet_membership: NonDet,
766    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
767    where
768        T: Serialize + DeserializeOwned,
769    {
770        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
771    }
772
773    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
774    /// the configuration in `via` to set up the message transport.
775    ///
776    /// This provides load balancing by evenly distributing work across cluster members. The
777    /// distribution is deterministic based on element order - the first element goes to member 0,
778    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
779    ///
780    /// # Non-Determinism
781    /// The set of cluster members may asynchronously change over time. Each element is distributed
782    /// based on the current cluster membership _at that point in time_. Depending on when cluster
783    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
784    /// membership is stable, the order of members in the round-robin pattern may change across runs.
785    ///
786    /// # Ordering Requirements
787    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
788    /// order of messages and retries affects the round-robin pattern.
789    ///
790    /// # Example
791    /// ```rust
792    /// # #[cfg(feature = "deploy")] {
793    /// # use hydro_lang::prelude::*;
794    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
795    /// # use hydro_lang::location::MemberId;
796    /// # use futures::StreamExt;
797    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
798    /// let p1 = flow.process::<()>();
799    /// let workers1: Cluster<()> = flow.cluster::<()>();
800    /// let workers2: Cluster<()> = flow.cluster::<()>();
801    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
802    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
803    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
804    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
805    /// # .entries()
806    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
807    /// # }, |mut stream| async move {
808    /// # let mut results = Vec::new();
809    /// # let mut locations = std::collections::HashSet::new();
810    /// # for w in 0..=16 {
811    /// #     let (location, v) = stream.next().await.unwrap();
812    /// #     locations.insert(location);
813    /// #     results.push(v);
814    /// # }
815    /// # results.sort();
816    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
817    /// # assert_eq!(locations.len(), 16);
818    /// # }));
819    /// # }
820    /// ```
821    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
822        self,
823        to: &Cluster<'a, L2>,
824        via: N,
825        nondet_membership: NonDet,
826    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
827    where
828        T: Serialize + DeserializeOwned,
829    {
830        let ids = track_membership(self.location.source_cluster_members(to));
831        sliced! {
832            let members_snapshot = use(ids, nondet_membership);
833            let elements = use(self.enumerate(), nondet_membership);
834
835            let current_members = members_snapshot
836                .filter(q!(|b| *b))
837                .keys()
838                .assume_ordering::<TotalOrder>(nondet_membership)
839                .collect_vec();
840
841            elements
842                .cross_singleton(current_members)
843                .filter_map(q!(|(data, members)| {
844                    if members.is_empty() {
845                        None
846                    } else {
847                        Some((members[data.0 % members.len()].clone(), data.1))
848                    }
849                }))
850        }
851        .demux(to, via)
852    }
853}
854
855impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
856    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
857    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
858    /// using [`bincode`] to serialize/deserialize messages.
859    ///
860    /// Each cluster member sends its local stream elements, and they are collected at the destination
861    /// as a [`KeyedStream`] where keys identify the source cluster member.
862    ///
863    /// # Example
864    /// ```rust
865    /// # #[cfg(feature = "deploy")] {
866    /// # use hydro_lang::prelude::*;
867    /// # use futures::StreamExt;
868    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
869    /// let workers: Cluster<()> = flow.cluster::<()>();
870    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
871    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
872    /// # all_received.entries()
873    /// # }, |mut stream| async move {
874    /// // if there are 4 members in the cluster, we should receive 4 elements
875    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
876    /// # let mut results = Vec::new();
877    /// # for w in 0..4 {
878    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
879    /// # }
880    /// # results.sort();
881    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
882    /// # }));
883    /// # }
884    /// ```
885    ///
886    /// If you don't need to know the source for each element, you can use `.values()`
887    /// to get just the data:
888    /// ```rust
889    /// # #[cfg(feature = "deploy")] {
890    /// # use hydro_lang::prelude::*;
891    /// # use hydro_lang::live_collections::stream::NoOrder;
892    /// # use futures::StreamExt;
893    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
894    /// # let workers: Cluster<()> = flow.cluster::<()>();
895    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
896    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
897    /// # values
898    /// # }, |mut stream| async move {
899    /// # let mut results = Vec::new();
900    /// # for w in 0..4 {
901    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
902    /// # }
903    /// # results.sort();
904    /// // if there are 4 members in the cluster, we should receive 4 elements
905    /// // 1, 1, 1, 1
906    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
907    /// # }));
908    /// # }
909    /// ```
910    pub fn send_bincode<L2>(
911        self,
912        other: &Process<'a, L2>,
913    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
914    where
915        T: Serialize + DeserializeOwned,
916    {
917        self.send(other, TCP.fail_stop().bincode())
918    }
919
920    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
921    /// using the configuration in `via` to set up the message transport.
922    ///
923    /// Each cluster member sends its local stream elements, and they are collected at the destination
924    /// as a [`KeyedStream`] where keys identify the source cluster member.
925    ///
926    /// # Example
927    /// ```rust
928    /// # #[cfg(feature = "deploy")] {
929    /// # use hydro_lang::prelude::*;
930    /// # use futures::StreamExt;
931    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
932    /// let workers: Cluster<()> = flow.cluster::<()>();
933    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
934    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
935    /// # all_received.entries()
936    /// # }, |mut stream| async move {
937    /// // if there are 4 members in the cluster, we should receive 4 elements
938    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
939    /// # let mut results = Vec::new();
940    /// # for w in 0..4 {
941    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
942    /// # }
943    /// # results.sort();
944    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
945    /// # }));
946    /// # }
947    /// ```
948    ///
949    /// If you don't need to know the source for each element, you can use `.values()`
950    /// to get just the data:
951    /// ```rust
952    /// # #[cfg(feature = "deploy")] {
953    /// # use hydro_lang::prelude::*;
954    /// # use hydro_lang::live_collections::stream::NoOrder;
955    /// # use futures::StreamExt;
956    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
957    /// # let workers: Cluster<()> = flow.cluster::<()>();
958    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
959    /// let values: Stream<i32, _, _, NoOrder> =
960    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
961    /// # values
962    /// # }, |mut stream| async move {
963    /// # let mut results = Vec::new();
964    /// # for w in 0..4 {
965    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
966    /// # }
967    /// # results.sort();
968    /// // if there are 4 members in the cluster, we should receive 4 elements
969    /// // 1, 1, 1, 1
970    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
971    /// # }));
972    /// # }
973    /// ```
974    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
975    pub fn send<L2, N: NetworkFor<T>>(
976        self,
977        to: &Process<'a, L2>,
978        via: N,
979    ) -> KeyedStream<
980        MemberId<L>,
981        T,
982        Process<'a, L2>,
983        Unbounded,
984        <O as MinOrder<N::OrderingGuarantee>>::Min,
985        R,
986    >
987    where
988        T: Serialize + DeserializeOwned,
989        O: MinOrder<N::OrderingGuarantee>,
990    {
991        let serialize_pipeline = Some(N::serialize_thunk(false));
992
993        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
994
995        let name = via.name();
996        if to.multiversioned() && name.is_none() {
997            panic!(
998                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
999            );
1000        }
1001
1002        let raw_stream: Stream<
1003            (MemberId<L>, T),
1004            Process<'a, L2>,
1005            Unbounded,
1006            <O as MinOrder<N::OrderingGuarantee>>::Min,
1007            R,
1008        > = Stream::new(
1009            to.clone(),
1010            HydroNode::Network {
1011                name: name.map(ToOwned::to_owned),
1012                networking_info: N::networking_info(),
1013                serialize_fn: serialize_pipeline.map(|e| e.into()),
1014                instantiate_fn: DebugInstantiate::Building,
1015                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1016                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1017                metadata: to.new_node_metadata(Stream::<
1018                    (MemberId<L>, T),
1019                    Process<'a, L2>,
1020                    Unbounded,
1021                    <O as MinOrder<N::OrderingGuarantee>>::Min,
1022                    R,
1023                >::collection_kind()),
1024            },
1025        );
1026
1027        raw_stream.into_keyed()
1028    }
1029
1030    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
1031    /// Broadcasts elements of this stream at each source member to all members of a destination
1032    /// cluster, using [`bincode`] to serialize/deserialize messages.
1033    ///
1034    /// Each source member sends each of its stream elements to **every** member of the cluster
1035    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
1036    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
1037    /// **only data elements** and sends each element to all cluster members.
1038    ///
1039    /// # Non-Determinism
1040    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1041    /// to the current cluster members known _at that point in time_ at the source member. Depending
1042    /// on when each source member is notified of membership changes, it will broadcast each element
1043    /// to different members.
1044    ///
1045    /// # Example
1046    /// ```rust
1047    /// # #[cfg(feature = "deploy")] {
1048    /// # use hydro_lang::prelude::*;
1049    /// # use hydro_lang::location::MemberId;
1050    /// # use futures::StreamExt;
1051    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1052    /// # type Source = ();
1053    /// # type Destination = ();
1054    /// let source: Cluster<Source> = flow.cluster::<Source>();
1055    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1056    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1057    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1058    /// # on_destination.entries().send_bincode(&p2).entries()
1059    /// // if there are 4 members in the desination, each receives one element from each source member
1060    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1061    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1062    /// // - ...
1063    /// # }, |mut stream| async move {
1064    /// # let mut results = Vec::new();
1065    /// # for w in 0..16 {
1066    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1067    /// # }
1068    /// # results.sort();
1069    /// # assert_eq!(results, vec![
1070    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1071    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1072    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1073    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1074    /// # ]);
1075    /// # }));
1076    /// # }
1077    /// ```
1078    pub fn broadcast_bincode<L2: 'a>(
1079        self,
1080        other: &Cluster<'a, L2>,
1081        nondet_membership: NonDet,
1082    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1083    where
1084        T: Clone + Serialize + DeserializeOwned,
1085    {
1086        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1087    }
1088
1089    /// Broadcasts elements of this stream at each source member to all members of a destination
1090    /// cluster, using the configuration in `via` to set up the message transport.
1091    ///
1092    /// Each source member sends each of its stream elements to **every** member of the cluster
1093    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1094    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1095    /// **only data elements** and sends each element to all cluster members.
1096    ///
1097    /// # Non-Determinism
1098    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1099    /// to the current cluster members known _at that point in time_ at the source member. Depending
1100    /// on when each source member is notified of membership changes, it will broadcast each element
1101    /// to different members.
1102    ///
1103    /// # Example
1104    /// ```rust
1105    /// # #[cfg(feature = "deploy")] {
1106    /// # use hydro_lang::prelude::*;
1107    /// # use hydro_lang::location::MemberId;
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1110    /// # type Source = ();
1111    /// # type Destination = ();
1112    /// let source: Cluster<Source> = flow.cluster::<Source>();
1113    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1114    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1115    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1116    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1117    /// // if there are 4 members in the desination, each receives one element from each source member
1118    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1119    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1120    /// // - ...
1121    /// # }, |mut stream| async move {
1122    /// # let mut results = Vec::new();
1123    /// # for w in 0..16 {
1124    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1125    /// # }
1126    /// # results.sort();
1127    /// # assert_eq!(results, vec![
1128    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1129    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1130    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1131    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1132    /// # ]);
1133    /// # }));
1134    /// # }
1135    /// ```
1136    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1137    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1138        self,
1139        to: &Cluster<'a, L2>,
1140        via: N,
1141        nondet_membership: NonDet,
1142    ) -> KeyedStream<
1143        MemberId<L>,
1144        T,
1145        Cluster<'a, L2>,
1146        Unbounded,
1147        <O as MinOrder<N::OrderingGuarantee>>::Min,
1148        R,
1149    >
1150    where
1151        T: Clone + Serialize + DeserializeOwned,
1152        O: MinOrder<N::OrderingGuarantee>,
1153    {
1154        let ids = track_membership(self.location.source_cluster_members(to));
1155        sliced! {
1156            let members_snapshot = use(ids, nondet_membership);
1157            let elements = use(self, nondet_membership);
1158
1159            let current_members = members_snapshot.filter(q!(|b| *b));
1160            elements.repeat_with_keys(current_members)
1161        }
1162        .demux(to, via)
1163    }
1164
1165    #[cfg(feature = "sim")]
1166    /// Sends elements of this cluster stream to an external location using bincode serialization.
1167    fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1168    where
1169        T: Serialize + DeserializeOwned,
1170    {
1171        let serialize_pipeline = Some(serialize_bincode::<T>(false));
1172
1173        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1174
1175        let external_port_id = flow_state_borrow.next_external_port();
1176
1177        flow_state_borrow.push_root(HydroRoot::SendExternal {
1178            to_external_key: other.key,
1179            to_port_id: external_port_id,
1180            to_many: false,
1181            unpaired: true,
1182            serialize_fn: serialize_pipeline.map(|e| e.into()),
1183            instantiate_fn: DebugInstantiate::Building,
1184            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1185            op_metadata: HydroIrOpMetadata::new(),
1186        });
1187
1188        ExternalBincodeStream {
1189            process_key: other.key,
1190            port_id: external_port_id,
1191            _phantom: PhantomData,
1192        }
1193    }
1194
1195    #[cfg(feature = "sim")]
1196    /// Sets up a simulation output port for this cluster stream, allowing test code
1197    /// to receive `(member_id, T)` pairs during simulation.
1198    pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1199    where
1200        T: Serialize + DeserializeOwned,
1201    {
1202        let external_location: External<'a, ()> = External {
1203            key: LocationKey::FIRST,
1204            flow_state: self.location.flow_state().clone(),
1205            _phantom: PhantomData,
1206        };
1207
1208        let external = self.send_bincode_external(&external_location);
1209
1210        crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1211    }
1212}
1213
1214impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1215    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1216{
1217    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1218    /// Sends elements of this stream at each source member to specific members of a destination
1219    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1220    ///
1221    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1222    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1223    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1224    /// all members.
1225    ///
1226    /// Each cluster member sends its local stream elements, and they are collected at each
1227    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1228    ///
1229    /// # Example
1230    /// ```rust
1231    /// # #[cfg(feature = "deploy")] {
1232    /// # use hydro_lang::prelude::*;
1233    /// # use futures::StreamExt;
1234    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1235    /// # type Source = ();
1236    /// # type Destination = ();
1237    /// let source: Cluster<Source> = flow.cluster::<Source>();
1238    /// let to_send: Stream<_, Cluster<_>, _> = source
1239    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1240    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1241    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1242    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1243    /// # all_received.entries().send_bincode(&p2).entries()
1244    /// # }, |mut stream| async move {
1245    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1246    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1247    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1248    /// // - ...
1249    /// # let mut results = Vec::new();
1250    /// # for w in 0..16 {
1251    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1252    /// # }
1253    /// # results.sort();
1254    /// # assert_eq!(results, vec![
1255    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1256    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1257    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1258    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1259    /// # ]);
1260    /// # }));
1261    /// # }
1262    /// ```
1263    pub fn demux_bincode(
1264        self,
1265        other: &Cluster<'a, L2>,
1266    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1267    where
1268        T: Serialize + DeserializeOwned,
1269    {
1270        self.demux(other, TCP.fail_stop().bincode())
1271    }
1272
1273    /// Sends elements of this stream at each source member to specific members of a destination
1274    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1275    /// message transport.
1276    ///
1277    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1278    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1279    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1280    /// all members.
1281    ///
1282    /// Each cluster member sends its local stream elements, and they are collected at each
1283    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1284    ///
1285    /// # Example
1286    /// ```rust
1287    /// # #[cfg(feature = "deploy")] {
1288    /// # use hydro_lang::prelude::*;
1289    /// # use futures::StreamExt;
1290    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1291    /// # type Source = ();
1292    /// # type Destination = ();
1293    /// let source: Cluster<Source> = flow.cluster::<Source>();
1294    /// let to_send: Stream<_, Cluster<_>, _> = source
1295    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1296    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1297    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1298    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1299    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1300    /// # }, |mut stream| async move {
1301    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1302    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1303    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1304    /// // - ...
1305    /// # let mut results = Vec::new();
1306    /// # for w in 0..16 {
1307    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1308    /// # }
1309    /// # results.sort();
1310    /// # assert_eq!(results, vec![
1311    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1312    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1313    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1314    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1315    /// # ]);
1316    /// # }));
1317    /// # }
1318    /// ```
1319    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1320    pub fn demux<N: NetworkFor<T>>(
1321        self,
1322        to: &Cluster<'a, L2>,
1323        via: N,
1324    ) -> KeyedStream<
1325        MemberId<L>,
1326        T,
1327        Cluster<'a, L2>,
1328        Unbounded,
1329        <O as MinOrder<N::OrderingGuarantee>>::Min,
1330        R,
1331    >
1332    where
1333        T: Serialize + DeserializeOwned,
1334        O: MinOrder<N::OrderingGuarantee>,
1335    {
1336        self.into_keyed().demux(to, via)
1337    }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342    #[cfg(feature = "sim")]
1343    use stageleft::q;
1344
1345    #[cfg(feature = "sim")]
1346    use crate::live_collections::sliced::sliced;
1347    #[cfg(feature = "sim")]
1348    use crate::location::{Location, MemberId};
1349    #[cfg(feature = "sim")]
1350    use crate::networking::TCP;
1351    #[cfg(feature = "sim")]
1352    use crate::nondet::nondet;
1353    #[cfg(feature = "sim")]
1354    use crate::prelude::FlowBuilder;
1355
1356    #[cfg(feature = "sim")]
1357    #[test]
1358    fn sim_send_bincode_o2o() {
1359        use crate::networking::TCP;
1360
1361        let mut flow = FlowBuilder::new();
1362        let node = flow.process::<()>();
1363        let node2 = flow.process::<()>();
1364
1365        let (in_send, input) = node.sim_input();
1366
1367        let out_recv = input
1368            .send(&node2, TCP.fail_stop().bincode())
1369            .batch(&node2.tick(), nondet!(/** test */))
1370            .count()
1371            .all_ticks()
1372            .sim_output();
1373
1374        let instances = flow.sim().exhaustive(async || {
1375            in_send.send(());
1376            in_send.send(());
1377            in_send.send(());
1378
1379            let received = out_recv.collect::<Vec<_>>().await;
1380            assert!(received.into_iter().sum::<usize>() == 3);
1381        });
1382
1383        assert_eq!(instances, 4); // 2^{3 - 1}
1384    }
1385
1386    #[cfg(feature = "sim")]
1387    #[test]
1388    fn sim_send_bincode_m2o() {
1389        let mut flow = FlowBuilder::new();
1390        let cluster = flow.cluster::<()>();
1391        let node = flow.process::<()>();
1392
1393        let input = cluster.source_iter(q!(vec![1]));
1394
1395        let out_recv = input
1396            .send(&node, TCP.fail_stop().bincode())
1397            .entries()
1398            .batch(&node.tick(), nondet!(/** test */))
1399            .all_ticks()
1400            .sim_output();
1401
1402        let instances = flow
1403            .sim()
1404            .with_cluster_size(&cluster, 4)
1405            .exhaustive(async || {
1406                out_recv
1407                    .assert_yields_only_unordered(vec![
1408                        (MemberId::from_raw_id(0), 1),
1409                        (MemberId::from_raw_id(1), 1),
1410                        (MemberId::from_raw_id(2), 1),
1411                        (MemberId::from_raw_id(3), 1),
1412                    ])
1413                    .await
1414            });
1415
1416        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1417    }
1418
1419    #[cfg(feature = "sim")]
1420    #[test]
1421    fn sim_send_bincode_multiple_m2o() {
1422        let mut flow = FlowBuilder::new();
1423        let cluster1 = flow.cluster::<()>();
1424        let cluster2 = flow.cluster::<()>();
1425        let node = flow.process::<()>();
1426
1427        let out_recv_1 = cluster1
1428            .source_iter(q!(vec![1]))
1429            .send(&node, TCP.fail_stop().bincode())
1430            .entries()
1431            .sim_output();
1432
1433        let out_recv_2 = cluster2
1434            .source_iter(q!(vec![2]))
1435            .send(&node, TCP.fail_stop().bincode())
1436            .entries()
1437            .sim_output();
1438
1439        let instances = flow
1440            .sim()
1441            .with_cluster_size(&cluster1, 3)
1442            .with_cluster_size(&cluster2, 4)
1443            .exhaustive(async || {
1444                out_recv_1
1445                    .assert_yields_only_unordered(vec![
1446                        (MemberId::from_raw_id(0), 1),
1447                        (MemberId::from_raw_id(1), 1),
1448                        (MemberId::from_raw_id(2), 1),
1449                    ])
1450                    .await;
1451
1452                out_recv_2
1453                    .assert_yields_only_unordered(vec![
1454                        (MemberId::from_raw_id(0), 2),
1455                        (MemberId::from_raw_id(1), 2),
1456                        (MemberId::from_raw_id(2), 2),
1457                        (MemberId::from_raw_id(3), 2),
1458                    ])
1459                    .await;
1460            });
1461
1462        assert_eq!(instances, 1);
1463    }
1464
1465    #[cfg(feature = "sim")]
1466    #[test]
1467    fn sim_send_bincode_o2m() {
1468        let mut flow = FlowBuilder::new();
1469        let cluster = flow.cluster::<()>();
1470        let node = flow.process::<()>();
1471
1472        let input = node.source_iter(q!(vec![
1473            (MemberId::from_raw_id(0), 123),
1474            (MemberId::from_raw_id(1), 456),
1475        ]));
1476
1477        let out_recv = input
1478            .demux(&cluster, TCP.fail_stop().bincode())
1479            .map(q!(|x| x + 1))
1480            .send(&node, TCP.fail_stop().bincode())
1481            .entries()
1482            .sim_output();
1483
1484        flow.sim()
1485            .with_cluster_size(&cluster, 4)
1486            .exhaustive(async || {
1487                out_recv
1488                    .assert_yields_only_unordered(vec![
1489                        (MemberId::from_raw_id(0), 124),
1490                        (MemberId::from_raw_id(1), 457),
1491                    ])
1492                    .await
1493            });
1494    }
1495
1496    #[cfg(feature = "sim")]
1497    #[test]
1498    fn sim_broadcast_bincode_o2m() {
1499        let mut flow = FlowBuilder::new();
1500        let cluster = flow.cluster::<()>();
1501        let node = flow.process::<()>();
1502
1503        let input = node.source_iter(q!(vec![123, 456]));
1504
1505        let out_recv = input
1506            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1507            .map(q!(|x| x + 1))
1508            .send(&node, TCP.fail_stop().bincode())
1509            .entries()
1510            .sim_output();
1511
1512        let mut c_1_produced = false;
1513        let mut c_2_produced = false;
1514        let mut c_1_saw_457_but_not_124 = false;
1515
1516        flow.sim()
1517            .with_cluster_size(&cluster, 2)
1518            .exhaustive(async || {
1519                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1520
1521                // check that order is preserved
1522                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1523                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1524                    c_1_produced = true;
1525                }
1526
1527                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1528                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1529                    c_2_produced = true;
1530                }
1531
1532                if all_out.contains(&(MemberId::from_raw_id(0), 457))
1533                    && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1534                {
1535                    c_1_saw_457_but_not_124 = true;
1536                }
1537            });
1538
1539        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1540
1541        // in at least one execution, the cluster member received 457 but not 124, this tests
1542        // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1543        assert!(c_1_saw_457_but_not_124);
1544    }
1545
1546    #[cfg(feature = "sim")]
1547    #[test]
1548    fn sim_send_bincode_m2m() {
1549        let mut flow = FlowBuilder::new();
1550        let cluster = flow.cluster::<()>();
1551        let node = flow.process::<()>();
1552
1553        let input = node.source_iter(q!(vec![
1554            (MemberId::from_raw_id(0), 123),
1555            (MemberId::from_raw_id(1), 456),
1556        ]));
1557
1558        let out_recv = input
1559            .demux(&cluster, TCP.fail_stop().bincode())
1560            .map(q!(|x| x + 1))
1561            .flat_map_ordered(q!(|x| vec![
1562                (MemberId::from_raw_id(0), x),
1563                (MemberId::from_raw_id(1), x),
1564            ]))
1565            .demux(&cluster, TCP.fail_stop().bincode())
1566            .entries()
1567            .send(&node, TCP.fail_stop().bincode())
1568            .entries()
1569            .sim_output();
1570
1571        flow.sim()
1572            .with_cluster_size(&cluster, 4)
1573            .exhaustive(async || {
1574                out_recv
1575                    .assert_yields_only_unordered(vec![
1576                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1577                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1578                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1579                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1580                    ])
1581                    .await
1582            });
1583    }
1584
1585    #[cfg(feature = "sim")]
1586    #[test]
1587    fn sim_lossy_delayed_forever_o2o() {
1588        use std::collections::HashSet;
1589
1590        use crate::properties::manual_proof;
1591
1592        let mut flow = FlowBuilder::new();
1593        let node = flow.process::<()>();
1594        let node2 = flow.process::<()>();
1595
1596        let received = node
1597            .source_iter(q!(0..3_u32))
1598            .send(&node2, TCP.lossy_delayed_forever().bincode())
1599            .fold(
1600                q!(|| std::collections::HashSet::<u32>::new()),
1601                q!(
1602                    |set, v| {
1603                        set.insert(v);
1604                    },
1605                    commutative = manual_proof!(/** set insert is commutative */)
1606                ),
1607            );
1608
1609        let out_recv = sliced! {
1610            let snapshot = use(received, nondet!(/** test */));
1611            snapshot.into_stream()
1612        }
1613        .sim_output();
1614
1615        let mut saw_non_contiguous = false;
1616
1617        flow.sim().test_safety_only().exhaustive(async || {
1618            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1619
1620            // Check each individual snapshot for a non-contiguous subset.
1621            for set in &snapshots {
1622                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1623                if set.len() >= 2 && set.len() < 3 {
1624                    let min = *set.iter().min().unwrap();
1625                    let max = *set.iter().max().unwrap();
1626                    if set.len() < (max - min + 1) as usize {
1627                        saw_non_contiguous = true;
1628                    }
1629                }
1630            }
1631        });
1632
1633        assert!(
1634            saw_non_contiguous,
1635            "Expected at least one execution with a non-contiguous subset of inputs"
1636        );
1637    }
1638
1639    #[cfg(feature = "sim")]
1640    #[test]
1641    fn sim_broadcast_closed_o2m() {
1642        let mut flow = FlowBuilder::new();
1643        let cluster = flow.cluster::<()>();
1644        let node = flow.process::<()>();
1645
1646        let input = node.source_iter(q!(vec![123, 456]));
1647
1648        let out_recv = input
1649            .broadcast_closed(&cluster, TCP.fail_stop().bincode())
1650            .send(&node, TCP.fail_stop().bincode())
1651            .entries()
1652            .sim_output();
1653
1654        flow.sim()
1655            .with_cluster_size(&cluster, 2)
1656            .exhaustive(async || {
1657                out_recv
1658                    .assert_yields_only_unordered(vec![
1659                        (MemberId::from_raw_id(0), 123),
1660                        (MemberId::from_raw_id(0), 456),
1661                        (MemberId::from_raw_id(1), 123),
1662                        (MemberId::from_raw_id(1), 456),
1663                    ])
1664                    .await
1665            });
1666    }
1667}