Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::{KeyedSingleton, MonotonicKeys};
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::{ClusterIds, Consistency, EventualConsistency, NoConsistency};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::{NonDet, nondet};
26use crate::properties::manual_proof;
27#[cfg(feature = "sim")]
28use crate::sim::SimReceiver;
29use crate::staging_util::get_this_crate;
30
31// same as the one in `hydro_std`, but internal use only
32fn track_membership<'a, C, L: Location<'a>>(
33    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
34) -> KeyedSingleton<MemberId<C>, bool, L, MonotonicKeys> {
35    membership.fold(
36        q!(|| false),
37        q!(|present, event| {
38            match event {
39                MembershipEvent::Joined => *present = true,
40                MembershipEvent::Left => *present = false,
41            }
42        }),
43    )
44}
45
46fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
47    let root = get_this_crate();
48
49    if is_demux {
50        parse_quote! {
51            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
52                |(id, data)| {
53                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
54                }
55            )
56        }
57    } else {
58        parse_quote! {
59            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
60                |data| {
61                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
62                }
63            )
64        }
65    }
66}
67
68pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
69    serialize_bincode_with_type(is_demux, &quote_type::<T>())
70}
71
72fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
73    let root = get_this_crate();
74    if let Some(c_type) = tagged {
75        parse_quote! {
76            |res| {
77                let (id, b) = res.unwrap();
78                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
79            }
80        }
81    } else {
82        parse_quote! {
83            |res| {
84                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
85            }
86        }
87    }
88}
89
90pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
91    deserialize_bincode_with_type(tagged, &quote_type::<T>())
92}
93
94impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
95    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
96    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
97    /// using [`bincode`] to serialize/deserialize messages.
98    ///
99    /// The returned stream captures the elements received at the destination, where values will
100    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
101    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
102    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
103    /// dropped no further messages will be sent.
104    ///
105    /// # Example
106    /// ```rust
107    /// # #[cfg(feature = "deploy")] {
108    /// # use hydro_lang::prelude::*;
109    /// # use futures::StreamExt;
110    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
111    /// let p1 = flow.process::<()>();
112    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
113    /// let p2 = flow.process::<()>();
114    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
115    /// // 1, 2, 3
116    /// # on_p2.send_bincode(&p_out)
117    /// # }, |mut stream| async move {
118    /// # for w in 1..=3 {
119    /// #     assert_eq!(stream.next().await, Some(w));
120    /// # }
121    /// # }));
122    /// # }
123    /// ```
124    pub fn send_bincode<L2>(
125        self,
126        other: &Process<'a, L2>,
127    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
128    where
129        T: Serialize + DeserializeOwned,
130    {
131        self.send(other, TCP.fail_stop().bincode())
132    }
133
134    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
135    /// using the configuration in `via` to set up the message transport.
136    ///
137    /// The returned stream captures the elements received at the destination, where values will
138    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
139    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
140    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
141    /// dropped no further messages will be sent.
142    ///
143    /// # Example
144    /// ```rust
145    /// # #[cfg(feature = "deploy")] {
146    /// # use hydro_lang::prelude::*;
147    /// # use futures::StreamExt;
148    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
149    /// let p1 = flow.process::<()>();
150    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
151    /// let p2 = flow.process::<()>();
152    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
153    /// // 1, 2, 3
154    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
155    /// # }, |mut stream| async move {
156    /// # for w in 1..=3 {
157    /// #     assert_eq!(stream.next().await, Some(w));
158    /// # }
159    /// # }));
160    /// # }
161    /// ```
162    pub fn send<L2, N: NetworkFor<T>>(
163        self,
164        to: &Process<'a, L2>,
165        via: N,
166    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
167    where
168        T: Serialize + DeserializeOwned,
169        O: MinOrder<N::OrderingGuarantee>,
170    {
171        let serialize_pipeline = Some(N::serialize_thunk(false));
172        let deserialize_pipeline = Some(N::deserialize_thunk(None));
173
174        let name = via.name();
175        if to.multiversioned() && name.is_none() {
176            panic!(
177                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
178            );
179        }
180
181        Stream::new(
182            to.clone(),
183            HydroNode::Network {
184                name: name.map(ToOwned::to_owned),
185                networking_info: N::networking_info(),
186                serialize_fn: serialize_pipeline.map(|e| e.into()),
187                instantiate_fn: DebugInstantiate::Building,
188                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
189                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
190                metadata: to.new_node_metadata(Stream::<
191                    T,
192                    Process<'a, L2>,
193                    Unbounded,
194                    <O as MinOrder<N::OrderingGuarantee>>::Min,
195                    R,
196                >::collection_kind()),
197            },
198        )
199    }
200
201    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
202    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
203    /// using [`bincode`] to serialize/deserialize messages.
204    ///
205    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
206    /// membership information. This is a common pattern in distributed systems for broadcasting data to
207    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
208    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
209    /// each element to all cluster members.
210    ///
211    /// # Non-Determinism
212    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
213    /// to the current cluster members _at that point in time_. Depending on when we are notified of
214    /// membership changes, we will broadcast each element to different members.
215    ///
216    /// # Example
217    /// ```rust
218    /// # #[cfg(feature = "deploy")] {
219    /// # use hydro_lang::prelude::*;
220    /// # use futures::StreamExt;
221    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
222    /// let p1 = flow.process::<()>();
223    /// let workers: Cluster<()> = flow.cluster::<()>();
224    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
225    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
226    /// # on_worker.send_bincode(&p2).entries()
227    /// // if there are 4 members in the cluster, each receives one element
228    /// // - MemberId::<()>(0): [123]
229    /// // - MemberId::<()>(1): [123]
230    /// // - MemberId::<()>(2): [123]
231    /// // - MemberId::<()>(3): [123]
232    /// # }, |mut stream| async move {
233    /// # let mut results = Vec::new();
234    /// # for w in 0..4 {
235    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
236    /// # }
237    /// # results.sort();
238    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
239    /// # }));
240    /// # }
241    /// ```
242    pub fn broadcast_bincode<L2: 'a>(
243        self,
244        other: &Cluster<'a, L2>,
245        nondet_membership: NonDet,
246    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
247    where
248        T: Clone + Serialize + DeserializeOwned,
249    {
250        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
251    }
252
253    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
254    /// using the configuration in `via` to set up the message transport.
255    ///
256    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
257    /// membership information. This is a common pattern in distributed systems for broadcasting data to
258    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
259    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
260    /// each element to all cluster members.
261    ///
262    /// # Non-Determinism
263    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
264    /// to the current cluster members _at that point in time_. Depending on when we are notified of
265    /// membership changes, we will broadcast each element to different members.
266    ///
267    /// # Example
268    /// ```rust
269    /// # #[cfg(feature = "deploy")] {
270    /// # use hydro_lang::prelude::*;
271    /// # use futures::StreamExt;
272    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273    /// let p1 = flow.process::<()>();
274    /// let workers: Cluster<()> = flow.cluster::<()>();
275    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
276    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
277    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
278    /// // if there are 4 members in the cluster, each receives one element
279    /// // - MemberId::<()>(0): [123]
280    /// // - MemberId::<()>(1): [123]
281    /// // - MemberId::<()>(2): [123]
282    /// // - MemberId::<()>(3): [123]
283    /// # }, |mut stream| async move {
284    /// # let mut results = Vec::new();
285    /// # for w in 0..4 {
286    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
287    /// # }
288    /// # results.sort();
289    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
290    /// # }));
291    /// # }
292    /// ```
293    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
294        self,
295        to: &Cluster<'a, L2>,
296        via: N,
297        nondet_membership: NonDet,
298    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
299    where
300        T: Clone + Serialize + DeserializeOwned,
301        O: MinOrder<N::OrderingGuarantee>,
302    {
303        let ids = track_membership(self.location.source_cluster_membership_stream(
304            to,
305            nondet!(/** dropped prefixes don't affect broadcast */),
306        ));
307        sliced! {
308            let members_snapshot = use(ids, nondet_membership);
309            let elements = use(self, nondet_membership);
310
311            let current_members = members_snapshot.filter(q!(|b| *b));
312            elements.repeat_with_keys(current_members)
313        }
314        .demux(to, via)
315    }
316
317    /// Broadcasts elements of this stream to all members of a cluster,
318    /// assuming membership is closed (fixed at deploy time).
319    ///
320    /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
321    /// The membership set is obtained from deploy metadata via
322    /// [`ClusterIds`], producing a
323    /// `Bounded` stream. The cross-product of data × members is fully
324    /// deterministic.
325    ///
326    /// This is only available in deployment targets with static cluster
327    /// membership (legacy Hydro Deploy and simulation). There are no late
328    /// joiners in that context, so broadcast receivers are guaranteed to
329    /// get data from the start of the stream. On dynamic targets
330    /// (e.g. ECS), use [`Stream::broadcast`] instead.
331    ///
332    /// # Example
333    /// ```rust
334    /// # #[cfg(feature = "deploy")] {
335    /// # use hydro_lang::prelude::*;
336    /// # use futures::StreamExt;
337    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
338    /// let p1 = flow.process::<()>();
339    /// let workers: Cluster<()> = flow.cluster::<()>();
340    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
341    /// let on_worker = numbers.broadcast_closed(&workers, TCP.fail_stop().bincode());
342    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
343    /// // each of the 4 cluster members receives 123
344    /// # }, |mut stream| async move {
345    /// # let mut results = Vec::new();
346    /// # for _ in 0..4 {
347    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
348    /// # }
349    /// # results.sort();
350    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
351    /// # }));
352    /// # }
353    /// ```
354    pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
355        self,
356        to: &Cluster<'a, L2>,
357        via: N,
358    ) -> Stream<
359        T,
360        Cluster<'a, L2, EventualConsistency>,
361        Unbounded,
362        <O as MinOrder<N::OrderingGuarantee>>::Min,
363        R,
364    >
365    where
366        T: Clone + Serialize + DeserializeOwned,
367        O: MinOrder<N::OrderingGuarantee>,
368    {
369        let cluster_ids = ClusterIds {
370            key: to.key,
371            _phantom: PhantomData,
372        };
373        let member_ids = self.location.source_iter(q!(cluster_ids
374            .iter()
375            .map(|id| MemberId::from_tagless(id.clone()))));
376
377        // Late joiners will receive no data from this broadcast, which is
378        // future-monotone and eventually consistent (a safe under-approximation).
379        self.cross_product(member_ids)
380            .map(q!(|(data, member_id)| (member_id, data)))
381            .into_keyed()
382            .demux(to, via)
383            .assert_has_consistency_of_trusted(manual_proof!(/** closed broadcast will materialze the same elements on each member */))
384    }
385
386    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
387    /// serialization. The external process can receive these elements by establishing a TCP
388    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
389    ///
390    /// # Example
391    /// ```rust
392    /// # #[cfg(feature = "deploy")] {
393    /// # use hydro_lang::prelude::*;
394    /// # use futures::StreamExt;
395    /// # tokio_test::block_on(async move {
396    /// let mut flow = FlowBuilder::new();
397    /// let process = flow.process::<()>();
398    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
399    /// let external = flow.external::<()>();
400    /// let external_handle = numbers.send_bincode_external(&external);
401    ///
402    /// let mut deployment = hydro_deploy::Deployment::new();
403    /// let nodes = flow
404    ///     .with_process(&process, deployment.Localhost())
405    ///     .with_external(&external, deployment.Localhost())
406    ///     .deploy(&mut deployment);
407    ///
408    /// deployment.deploy().await.unwrap();
409    /// // establish the TCP connection
410    /// let mut external_recv_stream = nodes.connect(external_handle).await;
411    /// deployment.start().await.unwrap();
412    ///
413    /// for w in 1..=3 {
414    ///     assert_eq!(external_recv_stream.next().await, Some(w));
415    /// }
416    /// # });
417    /// # }
418    /// ```
419    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
420    where
421        T: Serialize + DeserializeOwned,
422    {
423        let serialize_pipeline = Some(serialize_bincode::<T>(false));
424
425        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
426
427        let external_port_id = flow_state_borrow.next_external_port();
428
429        flow_state_borrow.push_root(HydroRoot::SendExternal {
430            to_external_key: other.key,
431            to_port_id: external_port_id,
432            to_many: false,
433            unpaired: true,
434            serialize_fn: serialize_pipeline.map(|e| e.into()),
435            instantiate_fn: DebugInstantiate::Building,
436            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
437            op_metadata: HydroIrOpMetadata::new(),
438        });
439
440        ExternalBincodeStream {
441            process_key: other.key,
442            port_id: external_port_id,
443            _phantom: PhantomData,
444        }
445    }
446
447    #[cfg(feature = "sim")]
448    /// Sets up a simulation output port for this stream, allowing test code to receive elements
449    /// sent to this stream during simulation.
450    pub fn sim_output(self) -> SimReceiver<T, O, R>
451    where
452        T: Serialize + DeserializeOwned,
453    {
454        let external_location: External<'a, ()> = External {
455            key: LocationKey::FIRST,
456            flow_state: self.location.flow_state().clone(),
457            _phantom: PhantomData,
458        };
459
460        let external = self.send_bincode_external(&external_location);
461
462        SimReceiver(external.port_id, PhantomData)
463    }
464}
465
466impl<'a, T, L: Location<'a>, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
467    /// Creates an external output for embedded deployment mode.
468    ///
469    /// The `name` parameter specifies the name of the field in the generated
470    /// `EmbeddedOutputs` struct that will receive elements from this stream.
471    /// The generated function will accept an `EmbeddedOutputs` struct with an
472    /// `impl FnMut(T)` field with this name.
473    pub fn embedded_output(self, name: impl Into<String>) {
474        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
475
476        self.location
477            .flow_state()
478            .borrow_mut()
479            .push_root(HydroRoot::EmbeddedOutput {
480                ident,
481                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
482                op_metadata: HydroIrOpMetadata::new(),
483            });
484    }
485}
486
487impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
488    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
489{
490    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
491    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
492    /// using [`bincode`] to serialize/deserialize messages.
493    ///
494    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
495    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
496    /// this API allows precise targeting of specific cluster members rather than broadcasting to
497    /// all members.
498    ///
499    /// # Example
500    /// ```rust
501    /// # #[cfg(feature = "deploy")] {
502    /// # use hydro_lang::prelude::*;
503    /// # use futures::StreamExt;
504    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
505    /// let p1 = flow.process::<()>();
506    /// let workers: Cluster<()> = flow.cluster::<()>();
507    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
508    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
509    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
510    ///     .demux_bincode(&workers);
511    /// # on_worker.send_bincode(&p2).entries()
512    /// // if there are 4 members in the cluster, each receives one element
513    /// // - MemberId::<()>(0): [0]
514    /// // - MemberId::<()>(1): [1]
515    /// // - MemberId::<()>(2): [2]
516    /// // - MemberId::<()>(3): [3]
517    /// # }, |mut stream| async move {
518    /// # let mut results = Vec::new();
519    /// # for w in 0..4 {
520    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
521    /// # }
522    /// # results.sort();
523    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
524    /// # }));
525    /// # }
526    /// ```
527    pub fn demux_bincode(
528        self,
529        other: &Cluster<'a, L2>,
530    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
531    where
532        T: Serialize + DeserializeOwned,
533    {
534        self.demux(other, TCP.fail_stop().bincode())
535    }
536
537    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
538    /// using the configuration in `via` to set up the message transport.
539    ///
540    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
541    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
542    /// this API allows precise targeting of specific cluster members rather than broadcasting to
543    /// all members.
544    ///
545    /// # Example
546    /// ```rust
547    /// # #[cfg(feature = "deploy")] {
548    /// # use hydro_lang::prelude::*;
549    /// # use futures::StreamExt;
550    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
551    /// let p1 = flow.process::<()>();
552    /// let workers: Cluster<()> = flow.cluster::<()>();
553    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
554    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
555    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
556    ///     .demux(&workers, TCP.fail_stop().bincode());
557    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
558    /// // if there are 4 members in the cluster, each receives one element
559    /// // - MemberId::<()>(0): [0]
560    /// // - MemberId::<()>(1): [1]
561    /// // - MemberId::<()>(2): [2]
562    /// // - MemberId::<()>(3): [3]
563    /// # }, |mut stream| async move {
564    /// # let mut results = Vec::new();
565    /// # for w in 0..4 {
566    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
567    /// # }
568    /// # results.sort();
569    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
570    /// # }));
571    /// # }
572    /// ```
573    pub fn demux<N: NetworkFor<T>>(
574        self,
575        to: &Cluster<'a, L2>,
576        via: N,
577    ) -> Stream<
578        T,
579        Cluster<'a, L2, NoConsistency>,
580        Unbounded,
581        <O as MinOrder<N::OrderingGuarantee>>::Min,
582        R,
583    >
584    where
585        T: Serialize + DeserializeOwned,
586        O: MinOrder<N::OrderingGuarantee>,
587    {
588        self.into_keyed().demux(to, via)
589    }
590}
591
592impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
593    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
594    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
595    /// [`bincode`] to serialize/deserialize messages.
596    ///
597    /// This provides load balancing by evenly distributing work across cluster members. The
598    /// distribution is deterministic based on element order - the first element goes to member 0,
599    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
600    ///
601    /// # Non-Determinism
602    /// The set of cluster members may asynchronously change over time. Each element is distributed
603    /// based on the current cluster membership _at that point in time_. Depending on when cluster
604    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
605    /// membership is stable, the order of members in the round-robin pattern may change across runs.
606    ///
607    /// # Ordering Requirements
608    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
609    /// order of messages and retries affects the round-robin pattern.
610    ///
611    /// # Example
612    /// ```rust
613    /// # #[cfg(feature = "deploy")] {
614    /// # use hydro_lang::prelude::*;
615    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
616    /// # use futures::StreamExt;
617    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
618    /// let p1 = flow.process::<()>();
619    /// let workers: Cluster<()> = flow.cluster::<()>();
620    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
621    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
622    /// on_worker.send_bincode(&p2)
623    /// # .first().values() // we use first to assert that each member gets one element
624    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
625    /// // - MemberId::<()>(?): [1]
626    /// // - MemberId::<()>(?): [2]
627    /// // - MemberId::<()>(?): [3]
628    /// // - MemberId::<()>(?): [4]
629    /// # }, |mut stream| async move {
630    /// # let mut results = Vec::new();
631    /// # for w in 0..4 {
632    /// #     results.push(stream.next().await.unwrap());
633    /// # }
634    /// # results.sort();
635    /// # assert_eq!(results, vec![1, 2, 3, 4]);
636    /// # }));
637    /// # }
638    /// ```
639    pub fn round_robin_bincode<L2: 'a>(
640        self,
641        other: &Cluster<'a, L2>,
642        nondet_membership: NonDet,
643    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
644    where
645        T: Serialize + DeserializeOwned,
646    {
647        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
648    }
649
650    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
651    /// the configuration in `via` to set up the message transport.
652    ///
653    /// This provides load balancing by evenly distributing work across cluster members. The
654    /// distribution is deterministic based on element order - the first element goes to member 0,
655    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
656    ///
657    /// # Non-Determinism
658    /// The set of cluster members may asynchronously change over time. Each element is distributed
659    /// based on the current cluster membership _at that point in time_. Depending on when cluster
660    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
661    /// membership is stable, the order of members in the round-robin pattern may change across runs.
662    ///
663    /// # Ordering Requirements
664    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
665    /// order of messages and retries affects the round-robin pattern.
666    ///
667    /// # Example
668    /// ```rust
669    /// # #[cfg(feature = "deploy")] {
670    /// # use hydro_lang::prelude::*;
671    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
672    /// # use futures::StreamExt;
673    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
674    /// let p1 = flow.process::<()>();
675    /// let workers: Cluster<()> = flow.cluster::<()>();
676    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
677    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
678    /// on_worker.send(&p2, TCP.fail_stop().bincode())
679    /// # .first().values() // we use first to assert that each member gets one element
680    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
681    /// // - MemberId::<()>(?): [1]
682    /// // - MemberId::<()>(?): [2]
683    /// // - MemberId::<()>(?): [3]
684    /// // - MemberId::<()>(?): [4]
685    /// # }, |mut stream| async move {
686    /// # let mut results = Vec::new();
687    /// # for w in 0..4 {
688    /// #     results.push(stream.next().await.unwrap());
689    /// # }
690    /// # results.sort();
691    /// # assert_eq!(results, vec![1, 2, 3, 4]);
692    /// # }));
693    /// # }
694    /// ```
695    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
696        self,
697        to: &Cluster<'a, L2>,
698        via: N,
699        nondet_membership: NonDet,
700    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
701    where
702        T: Serialize + DeserializeOwned,
703    {
704        let ids = track_membership(self.location.source_cluster_membership_stream(
705            to,
706            nondet!(/** dropped prefixes don't affect broadcast */),
707        ));
708        sliced! {
709            let members_snapshot = use(ids, nondet_membership);
710            let elements = use(self.enumerate(), nondet_membership);
711
712            let current_members = members_snapshot
713                .filter(q!(|b| *b))
714                .keys()
715                .assume_ordering::<TotalOrder>(nondet_membership)
716                .collect_vec();
717
718            elements
719                .cross_singleton(current_members)
720                .filter_map(q!(|(data, members)| {
721                    if members.is_empty() {
722                        None
723                    } else {
724                        Some((members[data.0 % members.len()].clone(), data.1))
725                    }
726                }))
727        }
728        .demux(to, via)
729    }
730}
731
732impl<'a, T, L, B: Boundedness, C: Consistency>
733    Stream<T, Cluster<'a, L, C>, B, TotalOrder, ExactlyOnce>
734{
735    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
736    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
737    /// [`bincode`] to serialize/deserialize messages.
738    ///
739    /// This provides load balancing by evenly distributing work across cluster members. The
740    /// distribution is deterministic based on element order - the first element goes to member 0,
741    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
742    ///
743    /// # Non-Determinism
744    /// The set of cluster members may asynchronously change over time. Each element is distributed
745    /// based on the current cluster membership _at that point in time_. Depending on when cluster
746    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
747    /// membership is stable, the order of members in the round-robin pattern may change across runs.
748    ///
749    /// # Ordering Requirements
750    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
751    /// order of messages and retries affects the round-robin pattern.
752    ///
753    /// # Example
754    /// ```rust
755    /// # #[cfg(feature = "deploy")] {
756    /// # use hydro_lang::prelude::*;
757    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
758    /// # use hydro_lang::location::MemberId;
759    /// # use futures::StreamExt;
760    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
761    /// let p1 = flow.process::<()>();
762    /// let workers1: Cluster<()> = flow.cluster::<()>();
763    /// let workers2: Cluster<()> = flow.cluster::<()>();
764    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
765    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
766    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
767    /// on_worker2.send_bincode(&p2)
768    /// # .entries()
769    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
770    /// # }, |mut stream| async move {
771    /// # let mut results = Vec::new();
772    /// # let mut locations = std::collections::HashSet::new();
773    /// # for w in 0..=16 {
774    /// #     let (location, v) = stream.next().await.unwrap();
775    /// #     locations.insert(location);
776    /// #     results.push(v);
777    /// # }
778    /// # results.sort();
779    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
780    /// # assert_eq!(locations.len(), 16);
781    /// # }));
782    /// # }
783    /// ```
784    pub fn round_robin_bincode<L2: 'a>(
785        self,
786        other: &Cluster<'a, L2>,
787        nondet_membership: NonDet,
788    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
789    where
790        T: Serialize + DeserializeOwned,
791    {
792        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
793    }
794
795    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
796    /// the configuration in `via` to set up the message transport.
797    ///
798    /// This provides load balancing by evenly distributing work across cluster members. The
799    /// distribution is deterministic based on element order - the first element goes to member 0,
800    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
801    ///
802    /// # Non-Determinism
803    /// The set of cluster members may asynchronously change over time. Each element is distributed
804    /// based on the current cluster membership _at that point in time_. Depending on when cluster
805    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
806    /// membership is stable, the order of members in the round-robin pattern may change across runs.
807    ///
808    /// # Ordering Requirements
809    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
810    /// order of messages and retries affects the round-robin pattern.
811    ///
812    /// # Example
813    /// ```rust
814    /// # #[cfg(feature = "deploy")] {
815    /// # use hydro_lang::prelude::*;
816    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
817    /// # use hydro_lang::location::MemberId;
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
820    /// let p1 = flow.process::<()>();
821    /// let workers1: Cluster<()> = flow.cluster::<()>();
822    /// let workers2: Cluster<()> = flow.cluster::<()>();
823    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
824    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
825    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
826    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
827    /// # .entries()
828    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
829    /// # }, |mut stream| async move {
830    /// # let mut results = Vec::new();
831    /// # let mut locations = std::collections::HashSet::new();
832    /// # for w in 0..=16 {
833    /// #     let (location, v) = stream.next().await.unwrap();
834    /// #     locations.insert(location);
835    /// #     results.push(v);
836    /// # }
837    /// # results.sort();
838    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
839    /// # assert_eq!(locations.len(), 16);
840    /// # }));
841    /// # }
842    /// ```
843    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
844        self,
845        to: &Cluster<'a, L2>,
846        via: N,
847        nondet_membership: NonDet,
848    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
849    where
850        T: Serialize + DeserializeOwned,
851    {
852        let ids = track_membership(self.location.source_cluster_membership_stream(
853            to,
854            nondet!(/** dropped prefixes don't affect broadcast */),
855        ));
856        sliced! {
857            let members_snapshot = use(ids, nondet_membership);
858            let elements = use(self.enumerate(), nondet_membership);
859
860            let current_members = members_snapshot
861                .filter(q!(|b| *b))
862                .keys()
863                .assume_ordering::<TotalOrder>(nondet_membership)
864                .collect_vec();
865
866            elements
867                .cross_singleton(current_members)
868                .filter_map(q!(|(data, members)| {
869                    if members.is_empty() {
870                        None
871                    } else {
872                        Some((members[data.0 % members.len()].clone(), data.1))
873                    }
874                }))
875        }
876        .demux(to, via)
877    }
878}
879
880impl<'a, T, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
881    Stream<T, Cluster<'a, L, C>, B, O, R>
882{
883    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
884    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
885    /// using [`bincode`] to serialize/deserialize messages.
886    ///
887    /// Each cluster member sends its local stream elements, and they are collected at the destination
888    /// as a [`KeyedStream`] where keys identify the source cluster member.
889    ///
890    /// # Example
891    /// ```rust
892    /// # #[cfg(feature = "deploy")] {
893    /// # use hydro_lang::prelude::*;
894    /// # use futures::StreamExt;
895    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
896    /// let workers: Cluster<()> = flow.cluster::<()>();
897    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
898    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
899    /// # all_received.entries()
900    /// # }, |mut stream| async move {
901    /// // if there are 4 members in the cluster, we should receive 4 elements
902    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
903    /// # let mut results = Vec::new();
904    /// # for w in 0..4 {
905    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
906    /// # }
907    /// # results.sort();
908    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
909    /// # }));
910    /// # }
911    /// ```
912    ///
913    /// If you don't need to know the source for each element, you can use `.values()`
914    /// to get just the data:
915    /// ```rust
916    /// # #[cfg(feature = "deploy")] {
917    /// # use hydro_lang::prelude::*;
918    /// # use hydro_lang::live_collections::stream::NoOrder;
919    /// # use futures::StreamExt;
920    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
921    /// # let workers: Cluster<()> = flow.cluster::<()>();
922    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
923    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
924    /// # values
925    /// # }, |mut stream| async move {
926    /// # let mut results = Vec::new();
927    /// # for w in 0..4 {
928    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
929    /// # }
930    /// # results.sort();
931    /// // if there are 4 members in the cluster, we should receive 4 elements
932    /// // 1, 1, 1, 1
933    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
934    /// # }));
935    /// # }
936    /// ```
937    pub fn send_bincode<L2>(
938        self,
939        other: &Process<'a, L2>,
940    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
941    where
942        T: Serialize + DeserializeOwned,
943    {
944        self.send(other, TCP.fail_stop().bincode())
945    }
946
947    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
948    /// using the configuration in `via` to set up the message transport.
949    ///
950    /// Each cluster member sends its local stream elements, and they are collected at the destination
951    /// as a [`KeyedStream`] where keys identify the source cluster member.
952    ///
953    /// # Example
954    /// ```rust
955    /// # #[cfg(feature = "deploy")] {
956    /// # use hydro_lang::prelude::*;
957    /// # use futures::StreamExt;
958    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
959    /// let workers: Cluster<()> = flow.cluster::<()>();
960    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
961    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
962    /// # all_received.entries()
963    /// # }, |mut stream| async move {
964    /// // if there are 4 members in the cluster, we should receive 4 elements
965    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
966    /// # let mut results = Vec::new();
967    /// # for w in 0..4 {
968    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
969    /// # }
970    /// # results.sort();
971    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
972    /// # }));
973    /// # }
974    /// ```
975    ///
976    /// If you don't need to know the source for each element, you can use `.values()`
977    /// to get just the data:
978    /// ```rust
979    /// # #[cfg(feature = "deploy")] {
980    /// # use hydro_lang::prelude::*;
981    /// # use hydro_lang::live_collections::stream::NoOrder;
982    /// # use futures::StreamExt;
983    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
984    /// # let workers: Cluster<()> = flow.cluster::<()>();
985    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
986    /// let values: Stream<i32, _, _, NoOrder> =
987    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
988    /// # values
989    /// # }, |mut stream| async move {
990    /// # let mut results = Vec::new();
991    /// # for w in 0..4 {
992    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
993    /// # }
994    /// # results.sort();
995    /// // if there are 4 members in the cluster, we should receive 4 elements
996    /// // 1, 1, 1, 1
997    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
998    /// # }));
999    /// # }
1000    /// ```
1001    pub fn send<L2, N: NetworkFor<T>>(
1002        self,
1003        to: &Process<'a, L2>,
1004        via: N,
1005    ) -> KeyedStream<
1006        MemberId<L>,
1007        T,
1008        Process<'a, L2>,
1009        Unbounded,
1010        <O as MinOrder<N::OrderingGuarantee>>::Min,
1011        R,
1012    >
1013    where
1014        T: Serialize + DeserializeOwned,
1015        O: MinOrder<N::OrderingGuarantee>,
1016    {
1017        let serialize_pipeline = Some(N::serialize_thunk(false));
1018
1019        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
1020
1021        let name = via.name();
1022        if to.multiversioned() && name.is_none() {
1023            panic!(
1024                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
1025            );
1026        }
1027
1028        let raw_stream: Stream<
1029            (MemberId<L>, T),
1030            Process<'a, L2>,
1031            Unbounded,
1032            <O as MinOrder<N::OrderingGuarantee>>::Min,
1033            R,
1034        > = Stream::new(
1035            to.clone(),
1036            HydroNode::Network {
1037                name: name.map(ToOwned::to_owned),
1038                networking_info: N::networking_info(),
1039                serialize_fn: serialize_pipeline.map(|e| e.into()),
1040                instantiate_fn: DebugInstantiate::Building,
1041                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1042                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1043                metadata: to.new_node_metadata(Stream::<
1044                    (MemberId<L>, T),
1045                    Process<'a, L2>,
1046                    Unbounded,
1047                    <O as MinOrder<N::OrderingGuarantee>>::Min,
1048                    R,
1049                >::collection_kind()),
1050            },
1051        );
1052
1053        raw_stream.into_keyed()
1054    }
1055
1056    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
1057    /// Broadcasts elements of this stream at each source member to all members of a destination
1058    /// cluster, using [`bincode`] to serialize/deserialize messages.
1059    ///
1060    /// Each source member sends each of its stream elements to **every** member of the cluster
1061    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
1062    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
1063    /// **only data elements** and sends each element to all cluster members.
1064    ///
1065    /// # Non-Determinism
1066    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1067    /// to the current cluster members known _at that point in time_ at the source member. Depending
1068    /// on when each source member is notified of membership changes, it will broadcast each element
1069    /// to different members.
1070    ///
1071    /// # Example
1072    /// ```rust
1073    /// # #[cfg(feature = "deploy")] {
1074    /// # use hydro_lang::prelude::*;
1075    /// # use hydro_lang::location::MemberId;
1076    /// # use futures::StreamExt;
1077    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1078    /// # type Source = ();
1079    /// # type Destination = ();
1080    /// let source: Cluster<Source> = flow.cluster::<Source>();
1081    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1082    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1083    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1084    /// # on_destination.entries().send_bincode(&p2).entries()
1085    /// // if there are 4 members in the desination, each receives one element from each source member
1086    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1087    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1088    /// // - ...
1089    /// # }, |mut stream| async move {
1090    /// # let mut results = Vec::new();
1091    /// # for w in 0..16 {
1092    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1093    /// # }
1094    /// # results.sort();
1095    /// # assert_eq!(results, vec![
1096    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1097    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1098    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1099    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1100    /// # ]);
1101    /// # }));
1102    /// # }
1103    /// ```
1104    pub fn broadcast_bincode<L2: 'a>(
1105        self,
1106        other: &Cluster<'a, L2>,
1107        nondet_membership: NonDet,
1108    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1109    where
1110        T: Clone + Serialize + DeserializeOwned,
1111    {
1112        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1113    }
1114
1115    /// Broadcasts elements of this stream at each source member to all members of a destination
1116    /// cluster, using the configuration in `via` to set up the message transport.
1117    ///
1118    /// Each source member sends each of its stream elements to **every** member of the cluster
1119    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1120    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1121    /// **only data elements** and sends each element to all cluster members.
1122    ///
1123    /// # Non-Determinism
1124    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1125    /// to the current cluster members known _at that point in time_ at the source member. Depending
1126    /// on when each source member is notified of membership changes, it will broadcast each element
1127    /// to different members.
1128    ///
1129    /// # Example
1130    /// ```rust
1131    /// # #[cfg(feature = "deploy")] {
1132    /// # use hydro_lang::prelude::*;
1133    /// # use hydro_lang::location::MemberId;
1134    /// # use futures::StreamExt;
1135    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1136    /// # type Source = ();
1137    /// # type Destination = ();
1138    /// let source: Cluster<Source> = flow.cluster::<Source>();
1139    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1140    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1141    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1142    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1143    /// // if there are 4 members in the desination, each receives one element from each source member
1144    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1145    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1146    /// // - ...
1147    /// # }, |mut stream| async move {
1148    /// # let mut results = Vec::new();
1149    /// # for w in 0..16 {
1150    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1151    /// # }
1152    /// # results.sort();
1153    /// # assert_eq!(results, vec![
1154    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1155    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1156    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1157    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1158    /// # ]);
1159    /// # }));
1160    /// # }
1161    /// ```
1162    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1163        self,
1164        to: &Cluster<'a, L2>,
1165        via: N,
1166        nondet_membership: NonDet,
1167    ) -> KeyedStream<
1168        MemberId<L>,
1169        T,
1170        Cluster<'a, L2>,
1171        Unbounded,
1172        <O as MinOrder<N::OrderingGuarantee>>::Min,
1173        R,
1174    >
1175    where
1176        T: Clone + Serialize + DeserializeOwned,
1177        O: MinOrder<N::OrderingGuarantee>,
1178    {
1179        let ids = track_membership(self.location.source_cluster_membership_stream(
1180            to,
1181            nondet!(/** dropped prefixes don't affect broadcast */),
1182        ));
1183        sliced! {
1184            let members_snapshot = use(ids, nondet_membership);
1185            let elements = use(self, nondet_membership);
1186
1187            let current_members = members_snapshot.filter(q!(|b| *b));
1188            elements.repeat_with_keys(current_members)
1189        }
1190        .demux(to, via)
1191    }
1192
1193    /// Broadcasts elements of this stream at each source member to all members of a destination
1194    /// cluster, assuming membership is closed (fixed at deploy time).
1195    ///
1196    /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
1197    /// The membership set is obtained from deploy metadata via [`ClusterIds`], making the
1198    /// broadcast fully deterministic. Since all source members send to all destination members
1199    /// and membership is fixed, every destination member receives the same set of elements
1200    /// from each source, guaranteeing [`EventualConsistency`].
1201    ///
1202    /// This is only available in deployment targets with static cluster membership
1203    /// (legacy Hydro Deploy and simulation). On dynamic targets, use [`Stream::broadcast`].
1204    pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
1205        self,
1206        to: &Cluster<'a, L2>,
1207        via: N,
1208    ) -> KeyedStream<
1209        MemberId<L>,
1210        T,
1211        Cluster<'a, L2, EventualConsistency>,
1212        Unbounded,
1213        <O as MinOrder<N::OrderingGuarantee>>::Min,
1214        R,
1215    >
1216    where
1217        T: Clone + Serialize + DeserializeOwned,
1218        O: MinOrder<N::OrderingGuarantee>,
1219    {
1220        let cluster_ids = ClusterIds {
1221            key: to.key,
1222            _phantom: PhantomData,
1223        };
1224        let member_ids = self
1225            .location
1226            .source_iter(q!(cluster_ids
1227                .iter()
1228                .map(|id| MemberId::from_tagless(id.clone()))))
1229            .assert_has_consistency_of_trusted::<Cluster<'a, L, C>>(manual_proof!(
1230                /// ClusterIds is deploy-time metadata, identical on every cluster member.
1231            ));
1232
1233        self.cross_product(member_ids)
1234            .map(q!(|(data, member_id)| (member_id, data)))
1235            .into_keyed()
1236            .demux(to, via)
1237            .assert_has_consistency_of_trusted(manual_proof!(
1238                /// Closed broadcast with fixed membership: every source member sends to every
1239                /// destination member, so all destinations materialize the same elements.
1240            ))
1241    }
1242
1243    #[cfg(feature = "sim")]
1244    /// Sends elements of this cluster stream to an external location using bincode serialization.
1245    fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1246    where
1247        T: Serialize + DeserializeOwned,
1248    {
1249        let serialize_pipeline = Some(serialize_bincode::<T>(false));
1250
1251        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1252
1253        let external_port_id = flow_state_borrow.next_external_port();
1254
1255        flow_state_borrow.push_root(HydroRoot::SendExternal {
1256            to_external_key: other.key,
1257            to_port_id: external_port_id,
1258            to_many: false,
1259            unpaired: true,
1260            serialize_fn: serialize_pipeline.map(|e| e.into()),
1261            instantiate_fn: DebugInstantiate::Building,
1262            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1263            op_metadata: HydroIrOpMetadata::new(),
1264        });
1265
1266        ExternalBincodeStream {
1267            process_key: other.key,
1268            port_id: external_port_id,
1269            _phantom: PhantomData,
1270        }
1271    }
1272
1273    #[cfg(feature = "sim")]
1274    /// Sets up a simulation output port for this cluster stream, allowing test code
1275    /// to receive `(member_id, T)` pairs during simulation.
1276    pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1277    where
1278        T: Serialize + DeserializeOwned,
1279    {
1280        let external_location: External<'a, ()> = External {
1281            key: LocationKey::FIRST,
1282            flow_state: self.location.flow_state().clone(),
1283            _phantom: PhantomData,
1284        };
1285
1286        let external = self.send_bincode_external(&external_location);
1287
1288        crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1289    }
1290}
1291
1292impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
1293    Stream<(MemberId<L2>, T), Cluster<'a, L, C>, B, O, R>
1294{
1295    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1296    /// Sends elements of this stream at each source member to specific members of a destination
1297    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1298    ///
1299    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1300    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1301    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1302    /// all members.
1303    ///
1304    /// Each cluster member sends its local stream elements, and they are collected at each
1305    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1306    ///
1307    /// # Example
1308    /// ```rust
1309    /// # #[cfg(feature = "deploy")] {
1310    /// # use hydro_lang::prelude::*;
1311    /// # use futures::StreamExt;
1312    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1313    /// # type Source = ();
1314    /// # type Destination = ();
1315    /// let source: Cluster<Source> = flow.cluster::<Source>();
1316    /// let to_send: Stream<_, Cluster<_>, _> = source
1317    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1318    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1319    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1320    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1321    /// # all_received.entries().send_bincode(&p2).entries()
1322    /// # }, |mut stream| async move {
1323    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1324    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1325    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1326    /// // - ...
1327    /// # let mut results = Vec::new();
1328    /// # for w in 0..16 {
1329    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1330    /// # }
1331    /// # results.sort();
1332    /// # assert_eq!(results, vec![
1333    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1334    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1335    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1336    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1337    /// # ]);
1338    /// # }));
1339    /// # }
1340    /// ```
1341    pub fn demux_bincode(
1342        self,
1343        other: &Cluster<'a, L2>,
1344    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1345    where
1346        T: Serialize + DeserializeOwned,
1347    {
1348        self.demux(other, TCP.fail_stop().bincode())
1349    }
1350
1351    /// Sends elements of this stream at each source member to specific members of a destination
1352    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1353    /// message transport.
1354    ///
1355    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1356    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1357    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1358    /// all members.
1359    ///
1360    /// Each cluster member sends its local stream elements, and they are collected at each
1361    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1362    ///
1363    /// # Example
1364    /// ```rust
1365    /// # #[cfg(feature = "deploy")] {
1366    /// # use hydro_lang::prelude::*;
1367    /// # use futures::StreamExt;
1368    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1369    /// # type Source = ();
1370    /// # type Destination = ();
1371    /// let source: Cluster<Source> = flow.cluster::<Source>();
1372    /// let to_send: Stream<_, Cluster<_>, _> = source
1373    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1374    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1375    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1376    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1377    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1378    /// # }, |mut stream| async move {
1379    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1380    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1381    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1382    /// // - ...
1383    /// # let mut results = Vec::new();
1384    /// # for w in 0..16 {
1385    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1386    /// # }
1387    /// # results.sort();
1388    /// # assert_eq!(results, vec![
1389    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1390    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1391    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1392    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1393    /// # ]);
1394    /// # }));
1395    /// # }
1396    /// ```
1397    pub fn demux<N: NetworkFor<T>>(
1398        self,
1399        to: &Cluster<'a, L2>,
1400        via: N,
1401    ) -> KeyedStream<
1402        MemberId<L>,
1403        T,
1404        Cluster<'a, L2, NoConsistency>,
1405        Unbounded,
1406        <O as MinOrder<N::OrderingGuarantee>>::Min,
1407        R,
1408    >
1409    where
1410        T: Serialize + DeserializeOwned,
1411        O: MinOrder<N::OrderingGuarantee>,
1412    {
1413        self.into_keyed().demux(to, via)
1414    }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419    #[cfg(feature = "sim")]
1420    use stageleft::q;
1421
1422    #[cfg(feature = "sim")]
1423    use crate::live_collections::sliced::sliced;
1424    #[cfg(feature = "sim")]
1425    use crate::location::{Location, MemberId};
1426    #[cfg(feature = "sim")]
1427    use crate::networking::TCP;
1428    #[cfg(feature = "sim")]
1429    use crate::nondet::nondet;
1430    #[cfg(feature = "sim")]
1431    use crate::prelude::FlowBuilder;
1432
1433    #[cfg(feature = "sim")]
1434    #[test]
1435    fn sim_send_bincode_o2o() {
1436        use crate::networking::TCP;
1437
1438        let mut flow = FlowBuilder::new();
1439        let node = flow.process::<()>();
1440        let node2 = flow.process::<()>();
1441
1442        let (in_send, input) = node.sim_input();
1443
1444        let out_recv = input
1445            .send(&node2, TCP.fail_stop().bincode())
1446            .batch(&node2.tick(), nondet!(/** test */))
1447            .count()
1448            .all_ticks()
1449            .sim_output();
1450
1451        let instances = flow.sim().exhaustive(async || {
1452            in_send.send(());
1453            in_send.send(());
1454            in_send.send(());
1455
1456            let received = out_recv.collect::<Vec<_>>().await;
1457            assert!(received.into_iter().sum::<usize>() == 3);
1458        });
1459
1460        assert_eq!(instances, 4); // 2^{3 - 1}
1461    }
1462
1463    #[cfg(feature = "sim")]
1464    #[test]
1465    fn sim_send_bincode_m2o() {
1466        let mut flow = FlowBuilder::new();
1467        let cluster = flow.cluster::<()>();
1468        let node = flow.process::<()>();
1469
1470        let input = cluster.source_iter(q!(vec![1]));
1471
1472        let out_recv = input
1473            .send(&node, TCP.fail_stop().bincode())
1474            .entries()
1475            .batch(&node.tick(), nondet!(/** test */))
1476            .all_ticks()
1477            .sim_output();
1478
1479        let instances = flow
1480            .sim()
1481            .with_cluster_size(&cluster, 4)
1482            .exhaustive(async || {
1483                out_recv
1484                    .assert_yields_only_unordered(vec![
1485                        (MemberId::from_raw_id(0), 1),
1486                        (MemberId::from_raw_id(1), 1),
1487                        (MemberId::from_raw_id(2), 1),
1488                        (MemberId::from_raw_id(3), 1),
1489                    ])
1490                    .await
1491            });
1492
1493        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1494    }
1495
1496    #[cfg(feature = "sim")]
1497    #[test]
1498    fn sim_send_bincode_multiple_m2o() {
1499        let mut flow = FlowBuilder::new();
1500        let cluster1 = flow.cluster::<()>();
1501        let cluster2 = flow.cluster::<()>();
1502        let node = flow.process::<()>();
1503
1504        let out_recv_1 = cluster1
1505            .source_iter(q!(vec![1]))
1506            .send(&node, TCP.fail_stop().bincode())
1507            .entries()
1508            .sim_output();
1509
1510        let out_recv_2 = cluster2
1511            .source_iter(q!(vec![2]))
1512            .send(&node, TCP.fail_stop().bincode())
1513            .entries()
1514            .sim_output();
1515
1516        let instances = flow
1517            .sim()
1518            .with_cluster_size(&cluster1, 3)
1519            .with_cluster_size(&cluster2, 4)
1520            .exhaustive(async || {
1521                out_recv_1
1522                    .assert_yields_only_unordered(vec![
1523                        (MemberId::from_raw_id(0), 1),
1524                        (MemberId::from_raw_id(1), 1),
1525                        (MemberId::from_raw_id(2), 1),
1526                    ])
1527                    .await;
1528
1529                out_recv_2
1530                    .assert_yields_only_unordered(vec![
1531                        (MemberId::from_raw_id(0), 2),
1532                        (MemberId::from_raw_id(1), 2),
1533                        (MemberId::from_raw_id(2), 2),
1534                        (MemberId::from_raw_id(3), 2),
1535                    ])
1536                    .await;
1537            });
1538
1539        assert_eq!(instances, 1);
1540    }
1541
1542    #[cfg(feature = "sim")]
1543    #[test]
1544    fn sim_send_bincode_o2m() {
1545        let mut flow = FlowBuilder::new();
1546        let cluster = flow.cluster::<()>();
1547        let node = flow.process::<()>();
1548
1549        let input = node.source_iter(q!(vec![
1550            (MemberId::from_raw_id(0), 123),
1551            (MemberId::from_raw_id(1), 456),
1552        ]));
1553
1554        let out_recv = input
1555            .demux(&cluster, TCP.fail_stop().bincode())
1556            .map(q!(|x| x + 1))
1557            .send(&node, TCP.fail_stop().bincode())
1558            .entries()
1559            .sim_output();
1560
1561        flow.sim()
1562            .with_cluster_size(&cluster, 4)
1563            .exhaustive(async || {
1564                out_recv
1565                    .assert_yields_only_unordered(vec![
1566                        (MemberId::from_raw_id(0), 124),
1567                        (MemberId::from_raw_id(1), 457),
1568                    ])
1569                    .await
1570            });
1571    }
1572
1573    #[cfg(feature = "sim")]
1574    #[test]
1575    fn sim_broadcast_bincode_o2m() {
1576        let mut flow = FlowBuilder::new();
1577        let cluster = flow.cluster::<()>();
1578        let node = flow.process::<()>();
1579
1580        let input = node.source_iter(q!(vec![123, 456]));
1581
1582        let out_recv = input
1583            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1584            .map(q!(|x| x + 1))
1585            .send(&node, TCP.fail_stop().bincode())
1586            .entries()
1587            .sim_output();
1588
1589        let mut c_1_produced = false;
1590        let mut c_2_produced = false;
1591        let mut c_1_saw_457_but_not_124 = false;
1592
1593        flow.sim()
1594            .with_cluster_size(&cluster, 2)
1595            .exhaustive(async || {
1596                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1597
1598                // check that order is preserved
1599                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1600                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1601                    c_1_produced = true;
1602                }
1603
1604                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1605                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1606                    c_2_produced = true;
1607                }
1608
1609                if all_out.contains(&(MemberId::from_raw_id(0), 457))
1610                    && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1611                {
1612                    c_1_saw_457_but_not_124 = true;
1613                }
1614            });
1615
1616        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1617
1618        // in at least one execution, the cluster member received 457 but not 124, this tests
1619        // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1620        assert!(c_1_saw_457_but_not_124);
1621    }
1622
1623    #[cfg(feature = "sim")]
1624    #[test]
1625    fn sim_send_bincode_m2m() {
1626        let mut flow = FlowBuilder::new();
1627        let cluster = flow.cluster::<()>();
1628        let node = flow.process::<()>();
1629
1630        let input = node.source_iter(q!(vec![
1631            (MemberId::from_raw_id(0), 123),
1632            (MemberId::from_raw_id(1), 456),
1633        ]));
1634
1635        let out_recv = input
1636            .demux(&cluster, TCP.fail_stop().bincode())
1637            .map(q!(|x| x + 1))
1638            .flat_map_ordered(q!(|x| vec![
1639                (MemberId::from_raw_id(0), x),
1640                (MemberId::from_raw_id(1), x),
1641            ]))
1642            .demux(&cluster, TCP.fail_stop().bincode())
1643            .entries()
1644            .send(&node, TCP.fail_stop().bincode())
1645            .entries()
1646            .sim_output();
1647
1648        flow.sim()
1649            .with_cluster_size(&cluster, 4)
1650            .exhaustive(async || {
1651                out_recv
1652                    .assert_yields_only_unordered(vec![
1653                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1654                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1655                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1656                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1657                    ])
1658                    .await
1659            });
1660    }
1661
1662    #[cfg(feature = "sim")]
1663    #[test]
1664    fn sim_lossy_delayed_forever_o2o() {
1665        use std::collections::HashSet;
1666
1667        use crate::properties::manual_proof;
1668
1669        let mut flow = FlowBuilder::new();
1670        let node = flow.process::<()>();
1671        let node2 = flow.process::<()>();
1672
1673        let received = node
1674            .source_iter(q!(0..3_u32))
1675            .send(&node2, TCP.lossy_delayed_forever().bincode())
1676            .fold(
1677                q!(|| std::collections::HashSet::<u32>::new()),
1678                q!(
1679                    |set, v| {
1680                        set.insert(v);
1681                    },
1682                    commutative = manual_proof!(/** set insert is commutative */)
1683                ),
1684            );
1685
1686        let out_recv = sliced! {
1687            let snapshot = use(received, nondet!(/** test */));
1688            snapshot.into_stream()
1689        }
1690        .sim_output();
1691
1692        let mut saw_non_contiguous = false;
1693
1694        flow.sim().test_safety_only().exhaustive(async || {
1695            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1696
1697            // Check each individual snapshot for a non-contiguous subset.
1698            for set in &snapshots {
1699                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1700                if set.len() >= 2 && set.len() < 3 {
1701                    let min = *set.iter().min().unwrap();
1702                    let max = *set.iter().max().unwrap();
1703                    if set.len() < (max - min + 1) as usize {
1704                        saw_non_contiguous = true;
1705                    }
1706                }
1707            }
1708        });
1709
1710        assert!(
1711            saw_non_contiguous,
1712            "Expected at least one execution with a non-contiguous subset of inputs"
1713        );
1714    }
1715
1716    #[cfg(feature = "sim")]
1717    #[test]
1718    fn sim_broadcast_closed_o2m() {
1719        let mut flow = FlowBuilder::new();
1720        let cluster = flow.cluster::<()>();
1721        let node = flow.process::<()>();
1722
1723        let input = node.source_iter(q!(vec![123, 456]));
1724
1725        let out_recv = input
1726            .broadcast_closed(&cluster, TCP.fail_stop().bincode())
1727            .send(&node, TCP.fail_stop().bincode())
1728            .entries()
1729            .sim_output();
1730
1731        flow.sim()
1732            .with_cluster_size(&cluster, 2)
1733            .exhaustive(async || {
1734                out_recv
1735                    .assert_yields_only_unordered(vec![
1736                        (MemberId::from_raw_id(0), 123),
1737                        (MemberId::from_raw_id(0), 456),
1738                        (MemberId::from_raw_id(1), 123),
1739                        (MemberId::from_raw_id(1), 456),
1740                    ])
1741                    .await
1742            });
1743    }
1744
1745    #[cfg(feature = "sim")]
1746    #[test]
1747    fn sim_broadcast_closed_m2m() {
1748        let mut flow = FlowBuilder::new();
1749        let source = flow.cluster::<()>();
1750        let dest: crate::location::Cluster<'_, ()> = flow.cluster::<()>();
1751        let node = flow.process::<()>();
1752
1753        let input = source.source_iter(q!(vec![123]));
1754
1755        // Broadcast from source cluster to dest cluster, then collect at a process.
1756        let out_recv = input
1757            .broadcast_closed(&dest, TCP.fail_stop().bincode())
1758            .entries()
1759            .send(&node, TCP.fail_stop().bincode())
1760            .entries()
1761            .sim_output();
1762
1763        flow.sim()
1764            .with_cluster_size(&source, 2)
1765            .with_cluster_size(&dest, 2)
1766            .exhaustive(async || {
1767                // Each source member (0, 1) broadcasts 123 to each dest member (0, 1).
1768                // The dest members then send to the process keyed by dest member id.
1769                // Each dest member receives (source_0, 123) and (source_1, 123).
1770                out_recv
1771                    .assert_yields_only_unordered(vec![
1772                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 123)),
1773                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 123)),
1774                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 123)),
1775                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 123)),
1776                    ])
1777                    .await
1778            });
1779    }
1780}