Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11use crate::location::cluster::{Consistency, NoConsistency};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15use crate::networking::{NetworkFor, TCP};
16
17impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
18    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
19{
20    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
21    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
22    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
23    ///
24    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
25    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
26    /// API allows precise targeting of specific cluster members rather than broadcasting to
27    /// all members.
28    ///
29    /// # Example
30    /// ```rust
31    /// # #[cfg(feature = "deploy")] {
32    /// # use hydro_lang::prelude::*;
33    /// # use futures::StreamExt;
34    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
35    /// let p1 = flow.process::<()>();
36    /// let workers: Cluster<()> = flow.cluster::<()>();
37    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
38    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
39    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
40    ///     .into_keyed()
41    ///     .demux_bincode(&workers);
42    /// # on_worker.send_bincode(&p2).entries()
43    /// // if there are 4 members in the cluster, each receives one element
44    /// // - MemberId::<()>(0): [0]
45    /// // - MemberId::<()>(1): [1]
46    /// // - MemberId::<()>(2): [2]
47    /// // - MemberId::<()>(3): [3]
48    /// # }, |mut stream| async move {
49    /// # let mut results = Vec::new();
50    /// # for w in 0..4 {
51    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
52    /// # }
53    /// # results.sort();
54    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
55    /// # }));
56    /// # }
57    /// ```
58    pub fn demux_bincode(
59        self,
60        other: &Cluster<'a, L2>,
61    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
62    where
63        T: Serialize + DeserializeOwned,
64    {
65        self.demux(other, TCP.fail_stop().bincode())
66    }
67
68    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
69    /// identifying the recipient for each group and using the configuration in `via` to set up the
70    /// message transport.
71    ///
72    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
73    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
74    /// API allows precise targeting of specific cluster members rather than broadcasting to
75    /// all members.
76    ///
77    /// # Example
78    /// ```rust
79    /// # #[cfg(feature = "deploy")] {
80    /// # use hydro_lang::prelude::*;
81    /// # use futures::StreamExt;
82    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
83    /// let p1 = flow.process::<()>();
84    /// let workers: Cluster<()> = flow.cluster::<()>();
85    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
86    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
87    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
88    ///     .into_keyed()
89    ///     .demux(&workers, TCP.fail_stop().bincode());
90    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
91    /// // if there are 4 members in the cluster, each receives one element
92    /// // - MemberId::<()>(0): [0]
93    /// // - MemberId::<()>(1): [1]
94    /// // - MemberId::<()>(2): [2]
95    /// // - MemberId::<()>(3): [3]
96    /// # }, |mut stream| async move {
97    /// # let mut results = Vec::new();
98    /// # for w in 0..4 {
99    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
100    /// # }
101    /// # results.sort();
102    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
103    /// # }));
104    /// # }
105    /// ```
106    pub fn demux<N: NetworkFor<T>>(
107        self,
108        to: &Cluster<'a, L2>,
109        via: N,
110    ) -> Stream<
111        T,
112        // NoConsistency because there each replica member may receive different streams
113        Cluster<'a, L2, NoConsistency>,
114        Unbounded,
115        <O as MinOrder<N::OrderingGuarantee>>::Min,
116        R,
117    >
118    where
119        T: Serialize + DeserializeOwned,
120        O: MinOrder<N::OrderingGuarantee>,
121    {
122        let serialize_pipeline = Some(N::serialize_thunk(true));
123
124        let deserialize_pipeline = Some(N::deserialize_thunk(None));
125
126        let name = via.name();
127        if to.multiversioned() && name.is_none() {
128            panic!(
129                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
130            );
131        }
132
133        Stream::new(
134            to.clone(),
135            HydroNode::Network {
136                name: name.map(ToOwned::to_owned),
137                networking_info: N::networking_info(),
138                serialize_fn: serialize_pipeline.map(|e| e.into()),
139                instantiate_fn: DebugInstantiate::Building,
140                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
141                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
142                metadata: to.new_node_metadata(Stream::<
143                    T,
144                    Cluster<'a, L2>,
145                    Unbounded,
146                    <O as MinOrder<N::OrderingGuarantee>>::Min,
147                    R,
148                >::collection_kind()),
149            },
150        )
151    }
152}
153
154impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
155    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
156{
157    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
158    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
159    /// compound key where the first element is the recipient's [`MemberId`] and the second element
160    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
161    /// messages.
162    ///
163    /// # Example
164    /// ```rust
165    /// # #[cfg(feature = "deploy")] {
166    /// # use hydro_lang::prelude::*;
167    /// # use futures::StreamExt;
168    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
169    /// let p1 = flow.process::<()>();
170    /// let workers: Cluster<()> = flow.cluster::<()>();
171    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
172    ///     .source_iter(q!(vec![0, 1, 2, 3]))
173    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
174    ///     .into_keyed();
175    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
176    /// # on_worker.entries().send_bincode(&p2).entries()
177    /// // if there are 4 members in the cluster, each receives one element
178    /// // - MemberId::<()>(0): { 0: [123] }
179    /// // - MemberId::<()>(1): { 1: [124] }
180    /// // - ...
181    /// # }, |mut stream| async move {
182    /// # let mut results = Vec::new();
183    /// # for w in 0..4 {
184    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
185    /// # }
186    /// # results.sort();
187    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
188    /// # }));
189    /// # }
190    /// ```
191    pub fn demux_bincode(
192        self,
193        other: &Cluster<'a, L2>,
194    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
195    where
196        K: Serialize + DeserializeOwned,
197        T: Serialize + DeserializeOwned,
198    {
199        self.demux(other, TCP.fail_stop().bincode())
200    }
201
202    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
203    /// compound key where the first element is the recipient's [`MemberId`] and the second element
204    /// is a key that will be sent along with the value, using the configuration in `via` to set up
205    /// the message transport.
206    ///
207    /// # Example
208    /// ```rust
209    /// # #[cfg(feature = "deploy")] {
210    /// # use hydro_lang::prelude::*;
211    /// # use futures::StreamExt;
212    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
213    /// let p1 = flow.process::<()>();
214    /// let workers: Cluster<()> = flow.cluster::<()>();
215    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
216    ///     .source_iter(q!(vec![0, 1, 2, 3]))
217    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
218    ///     .into_keyed();
219    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
220    /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
221    /// // if there are 4 members in the cluster, each receives one element
222    /// // - MemberId::<()>(0): { 0: [123] }
223    /// // - MemberId::<()>(1): { 1: [124] }
224    /// // - ...
225    /// # }, |mut stream| async move {
226    /// # let mut results = Vec::new();
227    /// # for w in 0..4 {
228    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
229    /// # }
230    /// # results.sort();
231    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
232    /// # }));
233    /// # }
234    /// ```
235    pub fn demux<N: NetworkFor<(K, T)>>(
236        self,
237        to: &Cluster<'a, L2>,
238        via: N,
239    ) -> KeyedStream<
240        K,
241        T,
242        Cluster<'a, L2, NoConsistency>,
243        Unbounded,
244        <O as MinOrder<N::OrderingGuarantee>>::Min,
245        R,
246    >
247    where
248        K: Serialize + DeserializeOwned,
249        T: Serialize + DeserializeOwned,
250        O: MinOrder<N::OrderingGuarantee>,
251    {
252        let serialize_pipeline = Some(N::serialize_thunk(true));
253
254        let deserialize_pipeline = Some(N::deserialize_thunk(None));
255
256        let name = via.name();
257        if to.multiversioned() && name.is_none() {
258            panic!(
259                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
260            );
261        }
262
263        KeyedStream::new(
264            to.clone(),
265            HydroNode::Network {
266                name: name.map(ToOwned::to_owned),
267                networking_info: N::networking_info(),
268                serialize_fn: serialize_pipeline.map(|e| e.into()),
269                instantiate_fn: DebugInstantiate::Building,
270                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
271                input: Box::new(
272                    self.entries()
273                        .map(q!(|((id, k), v)| (id, (k, v))))
274                        .ir_node
275                        .replace(HydroNode::Placeholder),
276                ),
277                metadata: to.new_node_metadata(KeyedStream::<
278                    K,
279                    T,
280                    Cluster<'a, L2>,
281                    Unbounded,
282                    <O as MinOrder<N::OrderingGuarantee>>::Min,
283                    R,
284                >::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
291    KeyedStream<MemberId<L2>, T, Cluster<'a, L, C>, B, O, R>
292{
293    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
294    /// Sends each group of this stream at each source member to a specific member of a destination
295    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
296    /// [`bincode`] to serialize/deserialize messages.
297    ///
298    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
299    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
300    /// API allows precise targeting of specific cluster members rather than broadcasting to all
301    /// members.
302    ///
303    /// Each cluster member sends its local stream elements, and they are collected at each
304    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
305    ///
306    /// # Example
307    /// ```rust
308    /// # #[cfg(feature = "deploy")] {
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
312    /// # type Source = ();
313    /// # type Destination = ();
314    /// let source: Cluster<Source> = flow.cluster::<Source>();
315    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
316    ///     .source_iter(q!(vec![0, 1, 2, 3]))
317    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
318    ///     .into_keyed();
319    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
320    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
321    /// # all_received.entries().send_bincode(&p2).entries()
322    /// # }, |mut stream| async move {
323    /// // if there are 4 members in the destination cluster, each receives one message from each source member
324    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
325    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
326    /// // - ...
327    /// # let mut results = Vec::new();
328    /// # for w in 0..16 {
329    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
330    /// # }
331    /// # results.sort();
332    /// # assert_eq!(results, vec![
333    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
334    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
335    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
336    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
337    /// # ]);
338    /// # }));
339    /// # }
340    /// ```
341    pub fn demux_bincode(
342        self,
343        other: &Cluster<'a, L2>,
344    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
345    where
346        T: Serialize + DeserializeOwned,
347    {
348        self.demux(other, TCP.fail_stop().bincode())
349    }
350
351    /// Sends each group of this stream at each source member to a specific member of a destination
352    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
353    /// configuration in `via` to set up the message transport.
354    ///
355    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
356    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
357    /// API allows precise targeting of specific cluster members rather than broadcasting to all
358    /// members.
359    ///
360    /// Each cluster member sends its local stream elements, and they are collected at each
361    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
362    ///
363    /// # Example
364    /// ```rust
365    /// # #[cfg(feature = "deploy")] {
366    /// # use hydro_lang::prelude::*;
367    /// # use futures::StreamExt;
368    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
369    /// # type Source = ();
370    /// # type Destination = ();
371    /// let source: Cluster<Source> = flow.cluster::<Source>();
372    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
373    ///     .source_iter(q!(vec![0, 1, 2, 3]))
374    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
375    ///     .into_keyed();
376    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
377    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
378    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
379    /// # }, |mut stream| async move {
380    /// // if there are 4 members in the destination cluster, each receives one message from each source member
381    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
382    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
383    /// // - ...
384    /// # let mut results = Vec::new();
385    /// # for w in 0..16 {
386    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
387    /// # }
388    /// # results.sort();
389    /// # assert_eq!(results, vec![
390    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
391    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
392    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
393    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
394    /// # ]);
395    /// # }));
396    /// # }
397    /// ```
398    pub fn demux<N: NetworkFor<T>>(
399        self,
400        to: &Cluster<'a, L2>,
401        via: N,
402    ) -> KeyedStream<
403        MemberId<L>,
404        T,
405        Cluster<'a, L2, NoConsistency>,
406        Unbounded,
407        <O as MinOrder<N::OrderingGuarantee>>::Min,
408        R,
409    >
410    where
411        T: Serialize + DeserializeOwned,
412        O: MinOrder<N::OrderingGuarantee>,
413    {
414        let serialize_pipeline = Some(N::serialize_thunk(true));
415
416        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
417
418        let name = via.name();
419        if to.multiversioned() && name.is_none() {
420            panic!(
421                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
422            );
423        }
424
425        KeyedStream::new(
426            to.clone(),
427            HydroNode::Network {
428                name: name.map(ToOwned::to_owned),
429                networking_info: N::networking_info(),
430                serialize_fn: serialize_pipeline.map(|e| e.into()),
431                instantiate_fn: DebugInstantiate::Building,
432                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
433                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
434                metadata: to.new_node_metadata(KeyedStream::<
435                    MemberId<L>,
436                    T,
437                    Cluster<'a, L2>,
438                    Unbounded,
439                    <O as MinOrder<N::OrderingGuarantee>>::Min,
440                    R,
441                >::collection_kind()),
442            },
443        )
444    }
445}
446
447impl<'a, K, V, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
448    KeyedStream<K, V, Cluster<'a, L, C>, B, O, R>
449{
450    #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
451    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
452    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
453    /// has a compound key where the first element is the sender's [`MemberId`] and the second
454    /// element is the original key.
455    ///
456    /// # Example
457    /// ```rust
458    /// # #[cfg(feature = "deploy")] {
459    /// # use hydro_lang::prelude::*;
460    /// # use futures::StreamExt;
461    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
462    /// # type Source = ();
463    /// # type Destination = ();
464    /// let source: Cluster<Source> = flow.cluster::<Source>();
465    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
466    ///     .source_iter(q!(vec![0, 1, 2, 3]))
467    ///     .map(q!(|x| (x, x + 123)))
468    ///     .into_keyed();
469    /// let destination_process = flow.process::<Destination>();
470    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
471    /// # all_received.entries().send_bincode(&p2)
472    /// # }, |mut stream| async move {
473    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
474    /// // {
475    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
476    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
477    /// //     ...
478    /// // }
479    /// # let mut results = Vec::new();
480    /// # for w in 0..16 {
481    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
482    /// # }
483    /// # results.sort();
484    /// # assert_eq!(results, vec![
485    /// #   "((MemberId::<()>(0), 0), 123)",
486    /// #   "((MemberId::<()>(0), 1), 124)",
487    /// #   "((MemberId::<()>(0), 2), 125)",
488    /// #   "((MemberId::<()>(0), 3), 126)",
489    /// #   "((MemberId::<()>(1), 0), 123)",
490    /// #   "((MemberId::<()>(1), 1), 124)",
491    /// #   "((MemberId::<()>(1), 2), 125)",
492    /// #   "((MemberId::<()>(1), 3), 126)",
493    /// #   "((MemberId::<()>(2), 0), 123)",
494    /// #   "((MemberId::<()>(2), 1), 124)",
495    /// #   "((MemberId::<()>(2), 2), 125)",
496    /// #   "((MemberId::<()>(2), 3), 126)",
497    /// #   "((MemberId::<()>(3), 0), 123)",
498    /// #   "((MemberId::<()>(3), 1), 124)",
499    /// #   "((MemberId::<()>(3), 2), 125)",
500    /// #   "((MemberId::<()>(3), 3), 126)",
501    /// # ]);
502    /// # }));
503    /// # }
504    /// ```
505    pub fn send_bincode<L2>(
506        self,
507        other: &Process<'a, L2>,
508    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
509    where
510        K: Serialize + DeserializeOwned,
511        V: Serialize + DeserializeOwned,
512    {
513        self.send(other, TCP.fail_stop().bincode())
514    }
515
516    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
517    /// network, using the configuration in `via` to set up the message transport. The resulting
518    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
519    /// the second element is the original key.
520    ///
521    /// # Example
522    /// ```rust
523    /// # #[cfg(feature = "deploy")] {
524    /// # use hydro_lang::prelude::*;
525    /// # use futures::StreamExt;
526    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
527    /// # type Source = ();
528    /// # type Destination = ();
529    /// let source: Cluster<Source> = flow.cluster::<Source>();
530    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
531    ///     .source_iter(q!(vec![0, 1, 2, 3]))
532    ///     .map(q!(|x| (x, x + 123)))
533    ///     .into_keyed();
534    /// let destination_process = flow.process::<Destination>();
535    /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
536    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
537    /// # }, |mut stream| async move {
538    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
539    /// // {
540    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
541    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
542    /// //     ...
543    /// // }
544    /// # let mut results = Vec::new();
545    /// # for w in 0..16 {
546    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
547    /// # }
548    /// # results.sort();
549    /// # assert_eq!(results, vec![
550    /// #   "((MemberId::<()>(0), 0), 123)",
551    /// #   "((MemberId::<()>(0), 1), 124)",
552    /// #   "((MemberId::<()>(0), 2), 125)",
553    /// #   "((MemberId::<()>(0), 3), 126)",
554    /// #   "((MemberId::<()>(1), 0), 123)",
555    /// #   "((MemberId::<()>(1), 1), 124)",
556    /// #   "((MemberId::<()>(1), 2), 125)",
557    /// #   "((MemberId::<()>(1), 3), 126)",
558    /// #   "((MemberId::<()>(2), 0), 123)",
559    /// #   "((MemberId::<()>(2), 1), 124)",
560    /// #   "((MemberId::<()>(2), 2), 125)",
561    /// #   "((MemberId::<()>(2), 3), 126)",
562    /// #   "((MemberId::<()>(3), 0), 123)",
563    /// #   "((MemberId::<()>(3), 1), 124)",
564    /// #   "((MemberId::<()>(3), 2), 125)",
565    /// #   "((MemberId::<()>(3), 3), 126)",
566    /// # ]);
567    /// # }));
568    /// # }
569    /// ```
570    pub fn send<L2, N: NetworkFor<(K, V)>>(
571        self,
572        to: &Process<'a, L2>,
573        via: N,
574    ) -> KeyedStream<
575        (MemberId<L>, K),
576        V,
577        Process<'a, L2>,
578        Unbounded,
579        <O as MinOrder<N::OrderingGuarantee>>::Min,
580        R,
581    >
582    where
583        K: Serialize + DeserializeOwned,
584        V: Serialize + DeserializeOwned,
585        O: MinOrder<N::OrderingGuarantee>,
586    {
587        let serialize_pipeline = Some(N::serialize_thunk(false));
588
589        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
590
591        let name = via.name();
592        if to.multiversioned() && name.is_none() {
593            panic!(
594                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
595            );
596        }
597
598        let raw_stream: Stream<
599            (MemberId<L>, (K, V)),
600            Process<'a, L2>,
601            Unbounded,
602            <O as MinOrder<N::OrderingGuarantee>>::Min,
603            R,
604        > = Stream::new(
605            to.clone(),
606            HydroNode::Network {
607                name: name.map(ToOwned::to_owned),
608                networking_info: N::networking_info(),
609                serialize_fn: serialize_pipeline.map(|e| e.into()),
610                instantiate_fn: DebugInstantiate::Building,
611                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
612                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
613                metadata: to.new_node_metadata(Stream::<
614                    (MemberId<L>, (K, V)),
615                    Cluster<'a, L2>,
616                    Unbounded,
617                    <O as MinOrder<N::OrderingGuarantee>>::Min,
618                    R,
619                >::collection_kind()),
620            },
621        );
622
623        raw_stream
624            .map(q!(|(sender, (k, v))| ((sender, k), v)))
625            .into_keyed()
626    }
627}