hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11use crate::location::cluster::{Consistency, NoConsistency};
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15use crate::networking::{NetworkFor, TCP};
16
17impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
18 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
19{
20 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
21 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
22 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
23 ///
24 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
25 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
26 /// API allows precise targeting of specific cluster members rather than broadcasting to
27 /// all members.
28 ///
29 /// # Example
30 /// ```rust
31 /// # #[cfg(feature = "deploy")] {
32 /// # use hydro_lang::prelude::*;
33 /// # use futures::StreamExt;
34 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
35 /// let p1 = flow.process::<()>();
36 /// let workers: Cluster<()> = flow.cluster::<()>();
37 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
38 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
39 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
40 /// .into_keyed()
41 /// .demux_bincode(&workers);
42 /// # on_worker.send_bincode(&p2).entries()
43 /// // if there are 4 members in the cluster, each receives one element
44 /// // - MemberId::<()>(0): [0]
45 /// // - MemberId::<()>(1): [1]
46 /// // - MemberId::<()>(2): [2]
47 /// // - MemberId::<()>(3): [3]
48 /// # }, |mut stream| async move {
49 /// # let mut results = Vec::new();
50 /// # for w in 0..4 {
51 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
52 /// # }
53 /// # results.sort();
54 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
55 /// # }));
56 /// # }
57 /// ```
58 pub fn demux_bincode(
59 self,
60 other: &Cluster<'a, L2>,
61 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
62 where
63 T: Serialize + DeserializeOwned,
64 {
65 self.demux(other, TCP.fail_stop().bincode())
66 }
67
68 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
69 /// identifying the recipient for each group and using the configuration in `via` to set up the
70 /// message transport.
71 ///
72 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
73 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
74 /// API allows precise targeting of specific cluster members rather than broadcasting to
75 /// all members.
76 ///
77 /// # Example
78 /// ```rust
79 /// # #[cfg(feature = "deploy")] {
80 /// # use hydro_lang::prelude::*;
81 /// # use futures::StreamExt;
82 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
83 /// let p1 = flow.process::<()>();
84 /// let workers: Cluster<()> = flow.cluster::<()>();
85 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
86 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
87 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
88 /// .into_keyed()
89 /// .demux(&workers, TCP.fail_stop().bincode());
90 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
91 /// // if there are 4 members in the cluster, each receives one element
92 /// // - MemberId::<()>(0): [0]
93 /// // - MemberId::<()>(1): [1]
94 /// // - MemberId::<()>(2): [2]
95 /// // - MemberId::<()>(3): [3]
96 /// # }, |mut stream| async move {
97 /// # let mut results = Vec::new();
98 /// # for w in 0..4 {
99 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
100 /// # }
101 /// # results.sort();
102 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
103 /// # }));
104 /// # }
105 /// ```
106 pub fn demux<N: NetworkFor<T>>(
107 self,
108 to: &Cluster<'a, L2>,
109 via: N,
110 ) -> Stream<
111 T,
112 // NoConsistency because there each replica member may receive different streams
113 Cluster<'a, L2, NoConsistency>,
114 Unbounded,
115 <O as MinOrder<N::OrderingGuarantee>>::Min,
116 R,
117 >
118 where
119 T: Serialize + DeserializeOwned,
120 O: MinOrder<N::OrderingGuarantee>,
121 {
122 let serialize_pipeline = Some(N::serialize_thunk(true));
123
124 let deserialize_pipeline = Some(N::deserialize_thunk(None));
125
126 let name = via.name();
127 if to.multiversioned() && name.is_none() {
128 panic!(
129 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
130 );
131 }
132
133 Stream::new(
134 to.clone(),
135 HydroNode::Network {
136 name: name.map(ToOwned::to_owned),
137 networking_info: N::networking_info(),
138 serialize_fn: serialize_pipeline.map(|e| e.into()),
139 instantiate_fn: DebugInstantiate::Building,
140 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
141 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
142 metadata: to.new_node_metadata(Stream::<
143 T,
144 Cluster<'a, L2>,
145 Unbounded,
146 <O as MinOrder<N::OrderingGuarantee>>::Min,
147 R,
148 >::collection_kind()),
149 },
150 )
151 }
152}
153
154impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
155 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
156{
157 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
158 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
159 /// compound key where the first element is the recipient's [`MemberId`] and the second element
160 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
161 /// messages.
162 ///
163 /// # Example
164 /// ```rust
165 /// # #[cfg(feature = "deploy")] {
166 /// # use hydro_lang::prelude::*;
167 /// # use futures::StreamExt;
168 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
169 /// let p1 = flow.process::<()>();
170 /// let workers: Cluster<()> = flow.cluster::<()>();
171 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
172 /// .source_iter(q!(vec![0, 1, 2, 3]))
173 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
174 /// .into_keyed();
175 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
176 /// # on_worker.entries().send_bincode(&p2).entries()
177 /// // if there are 4 members in the cluster, each receives one element
178 /// // - MemberId::<()>(0): { 0: [123] }
179 /// // - MemberId::<()>(1): { 1: [124] }
180 /// // - ...
181 /// # }, |mut stream| async move {
182 /// # let mut results = Vec::new();
183 /// # for w in 0..4 {
184 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
185 /// # }
186 /// # results.sort();
187 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
188 /// # }));
189 /// # }
190 /// ```
191 pub fn demux_bincode(
192 self,
193 other: &Cluster<'a, L2>,
194 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
195 where
196 K: Serialize + DeserializeOwned,
197 T: Serialize + DeserializeOwned,
198 {
199 self.demux(other, TCP.fail_stop().bincode())
200 }
201
202 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
203 /// compound key where the first element is the recipient's [`MemberId`] and the second element
204 /// is a key that will be sent along with the value, using the configuration in `via` to set up
205 /// the message transport.
206 ///
207 /// # Example
208 /// ```rust
209 /// # #[cfg(feature = "deploy")] {
210 /// # use hydro_lang::prelude::*;
211 /// # use futures::StreamExt;
212 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
213 /// let p1 = flow.process::<()>();
214 /// let workers: Cluster<()> = flow.cluster::<()>();
215 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
216 /// .source_iter(q!(vec![0, 1, 2, 3]))
217 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
218 /// .into_keyed();
219 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
220 /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
221 /// // if there are 4 members in the cluster, each receives one element
222 /// // - MemberId::<()>(0): { 0: [123] }
223 /// // - MemberId::<()>(1): { 1: [124] }
224 /// // - ...
225 /// # }, |mut stream| async move {
226 /// # let mut results = Vec::new();
227 /// # for w in 0..4 {
228 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
229 /// # }
230 /// # results.sort();
231 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
232 /// # }));
233 /// # }
234 /// ```
235 pub fn demux<N: NetworkFor<(K, T)>>(
236 self,
237 to: &Cluster<'a, L2>,
238 via: N,
239 ) -> KeyedStream<
240 K,
241 T,
242 Cluster<'a, L2, NoConsistency>,
243 Unbounded,
244 <O as MinOrder<N::OrderingGuarantee>>::Min,
245 R,
246 >
247 where
248 K: Serialize + DeserializeOwned,
249 T: Serialize + DeserializeOwned,
250 O: MinOrder<N::OrderingGuarantee>,
251 {
252 let serialize_pipeline = Some(N::serialize_thunk(true));
253
254 let deserialize_pipeline = Some(N::deserialize_thunk(None));
255
256 let name = via.name();
257 if to.multiversioned() && name.is_none() {
258 panic!(
259 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
260 );
261 }
262
263 KeyedStream::new(
264 to.clone(),
265 HydroNode::Network {
266 name: name.map(ToOwned::to_owned),
267 networking_info: N::networking_info(),
268 serialize_fn: serialize_pipeline.map(|e| e.into()),
269 instantiate_fn: DebugInstantiate::Building,
270 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
271 input: Box::new(
272 self.entries()
273 .map(q!(|((id, k), v)| (id, (k, v))))
274 .ir_node
275 .replace(HydroNode::Placeholder),
276 ),
277 metadata: to.new_node_metadata(KeyedStream::<
278 K,
279 T,
280 Cluster<'a, L2>,
281 Unbounded,
282 <O as MinOrder<N::OrderingGuarantee>>::Min,
283 R,
284 >::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
291 KeyedStream<MemberId<L2>, T, Cluster<'a, L, C>, B, O, R>
292{
293 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
294 /// Sends each group of this stream at each source member to a specific member of a destination
295 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
296 /// [`bincode`] to serialize/deserialize messages.
297 ///
298 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
299 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
300 /// API allows precise targeting of specific cluster members rather than broadcasting to all
301 /// members.
302 ///
303 /// Each cluster member sends its local stream elements, and they are collected at each
304 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
305 ///
306 /// # Example
307 /// ```rust
308 /// # #[cfg(feature = "deploy")] {
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
312 /// # type Source = ();
313 /// # type Destination = ();
314 /// let source: Cluster<Source> = flow.cluster::<Source>();
315 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
316 /// .source_iter(q!(vec![0, 1, 2, 3]))
317 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
318 /// .into_keyed();
319 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
320 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
321 /// # all_received.entries().send_bincode(&p2).entries()
322 /// # }, |mut stream| async move {
323 /// // if there are 4 members in the destination cluster, each receives one message from each source member
324 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
325 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
326 /// // - ...
327 /// # let mut results = Vec::new();
328 /// # for w in 0..16 {
329 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
330 /// # }
331 /// # results.sort();
332 /// # assert_eq!(results, vec![
333 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
334 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
335 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
336 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
337 /// # ]);
338 /// # }));
339 /// # }
340 /// ```
341 pub fn demux_bincode(
342 self,
343 other: &Cluster<'a, L2>,
344 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
345 where
346 T: Serialize + DeserializeOwned,
347 {
348 self.demux(other, TCP.fail_stop().bincode())
349 }
350
351 /// Sends each group of this stream at each source member to a specific member of a destination
352 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
353 /// configuration in `via` to set up the message transport.
354 ///
355 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
356 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
357 /// API allows precise targeting of specific cluster members rather than broadcasting to all
358 /// members.
359 ///
360 /// Each cluster member sends its local stream elements, and they are collected at each
361 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
362 ///
363 /// # Example
364 /// ```rust
365 /// # #[cfg(feature = "deploy")] {
366 /// # use hydro_lang::prelude::*;
367 /// # use futures::StreamExt;
368 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
369 /// # type Source = ();
370 /// # type Destination = ();
371 /// let source: Cluster<Source> = flow.cluster::<Source>();
372 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
373 /// .source_iter(q!(vec![0, 1, 2, 3]))
374 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
375 /// .into_keyed();
376 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
377 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
378 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
379 /// # }, |mut stream| async move {
380 /// // if there are 4 members in the destination cluster, each receives one message from each source member
381 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
382 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
383 /// // - ...
384 /// # let mut results = Vec::new();
385 /// # for w in 0..16 {
386 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
387 /// # }
388 /// # results.sort();
389 /// # assert_eq!(results, vec![
390 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
391 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
392 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
393 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
394 /// # ]);
395 /// # }));
396 /// # }
397 /// ```
398 pub fn demux<N: NetworkFor<T>>(
399 self,
400 to: &Cluster<'a, L2>,
401 via: N,
402 ) -> KeyedStream<
403 MemberId<L>,
404 T,
405 Cluster<'a, L2, NoConsistency>,
406 Unbounded,
407 <O as MinOrder<N::OrderingGuarantee>>::Min,
408 R,
409 >
410 where
411 T: Serialize + DeserializeOwned,
412 O: MinOrder<N::OrderingGuarantee>,
413 {
414 let serialize_pipeline = Some(N::serialize_thunk(true));
415
416 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
417
418 let name = via.name();
419 if to.multiversioned() && name.is_none() {
420 panic!(
421 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
422 );
423 }
424
425 KeyedStream::new(
426 to.clone(),
427 HydroNode::Network {
428 name: name.map(ToOwned::to_owned),
429 networking_info: N::networking_info(),
430 serialize_fn: serialize_pipeline.map(|e| e.into()),
431 instantiate_fn: DebugInstantiate::Building,
432 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
433 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
434 metadata: to.new_node_metadata(KeyedStream::<
435 MemberId<L>,
436 T,
437 Cluster<'a, L2>,
438 Unbounded,
439 <O as MinOrder<N::OrderingGuarantee>>::Min,
440 R,
441 >::collection_kind()),
442 },
443 )
444 }
445}
446
447impl<'a, K, V, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
448 KeyedStream<K, V, Cluster<'a, L, C>, B, O, R>
449{
450 #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
451 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
452 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
453 /// has a compound key where the first element is the sender's [`MemberId`] and the second
454 /// element is the original key.
455 ///
456 /// # Example
457 /// ```rust
458 /// # #[cfg(feature = "deploy")] {
459 /// # use hydro_lang::prelude::*;
460 /// # use futures::StreamExt;
461 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
462 /// # type Source = ();
463 /// # type Destination = ();
464 /// let source: Cluster<Source> = flow.cluster::<Source>();
465 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
466 /// .source_iter(q!(vec![0, 1, 2, 3]))
467 /// .map(q!(|x| (x, x + 123)))
468 /// .into_keyed();
469 /// let destination_process = flow.process::<Destination>();
470 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
471 /// # all_received.entries().send_bincode(&p2)
472 /// # }, |mut stream| async move {
473 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
474 /// // {
475 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
476 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
477 /// // ...
478 /// // }
479 /// # let mut results = Vec::new();
480 /// # for w in 0..16 {
481 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
482 /// # }
483 /// # results.sort();
484 /// # assert_eq!(results, vec![
485 /// # "((MemberId::<()>(0), 0), 123)",
486 /// # "((MemberId::<()>(0), 1), 124)",
487 /// # "((MemberId::<()>(0), 2), 125)",
488 /// # "((MemberId::<()>(0), 3), 126)",
489 /// # "((MemberId::<()>(1), 0), 123)",
490 /// # "((MemberId::<()>(1), 1), 124)",
491 /// # "((MemberId::<()>(1), 2), 125)",
492 /// # "((MemberId::<()>(1), 3), 126)",
493 /// # "((MemberId::<()>(2), 0), 123)",
494 /// # "((MemberId::<()>(2), 1), 124)",
495 /// # "((MemberId::<()>(2), 2), 125)",
496 /// # "((MemberId::<()>(2), 3), 126)",
497 /// # "((MemberId::<()>(3), 0), 123)",
498 /// # "((MemberId::<()>(3), 1), 124)",
499 /// # "((MemberId::<()>(3), 2), 125)",
500 /// # "((MemberId::<()>(3), 3), 126)",
501 /// # ]);
502 /// # }));
503 /// # }
504 /// ```
505 pub fn send_bincode<L2>(
506 self,
507 other: &Process<'a, L2>,
508 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
509 where
510 K: Serialize + DeserializeOwned,
511 V: Serialize + DeserializeOwned,
512 {
513 self.send(other, TCP.fail_stop().bincode())
514 }
515
516 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
517 /// network, using the configuration in `via` to set up the message transport. The resulting
518 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
519 /// the second element is the original key.
520 ///
521 /// # Example
522 /// ```rust
523 /// # #[cfg(feature = "deploy")] {
524 /// # use hydro_lang::prelude::*;
525 /// # use futures::StreamExt;
526 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
527 /// # type Source = ();
528 /// # type Destination = ();
529 /// let source: Cluster<Source> = flow.cluster::<Source>();
530 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
531 /// .source_iter(q!(vec![0, 1, 2, 3]))
532 /// .map(q!(|x| (x, x + 123)))
533 /// .into_keyed();
534 /// let destination_process = flow.process::<Destination>();
535 /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
536 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
537 /// # }, |mut stream| async move {
538 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
539 /// // {
540 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
541 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
542 /// // ...
543 /// // }
544 /// # let mut results = Vec::new();
545 /// # for w in 0..16 {
546 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
547 /// # }
548 /// # results.sort();
549 /// # assert_eq!(results, vec![
550 /// # "((MemberId::<()>(0), 0), 123)",
551 /// # "((MemberId::<()>(0), 1), 124)",
552 /// # "((MemberId::<()>(0), 2), 125)",
553 /// # "((MemberId::<()>(0), 3), 126)",
554 /// # "((MemberId::<()>(1), 0), 123)",
555 /// # "((MemberId::<()>(1), 1), 124)",
556 /// # "((MemberId::<()>(1), 2), 125)",
557 /// # "((MemberId::<()>(1), 3), 126)",
558 /// # "((MemberId::<()>(2), 0), 123)",
559 /// # "((MemberId::<()>(2), 1), 124)",
560 /// # "((MemberId::<()>(2), 2), 125)",
561 /// # "((MemberId::<()>(2), 3), 126)",
562 /// # "((MemberId::<()>(3), 0), 123)",
563 /// # "((MemberId::<()>(3), 1), 124)",
564 /// # "((MemberId::<()>(3), 2), 125)",
565 /// # "((MemberId::<()>(3), 3), 126)",
566 /// # ]);
567 /// # }));
568 /// # }
569 /// ```
570 pub fn send<L2, N: NetworkFor<(K, V)>>(
571 self,
572 to: &Process<'a, L2>,
573 via: N,
574 ) -> KeyedStream<
575 (MemberId<L>, K),
576 V,
577 Process<'a, L2>,
578 Unbounded,
579 <O as MinOrder<N::OrderingGuarantee>>::Min,
580 R,
581 >
582 where
583 K: Serialize + DeserializeOwned,
584 V: Serialize + DeserializeOwned,
585 O: MinOrder<N::OrderingGuarantee>,
586 {
587 let serialize_pipeline = Some(N::serialize_thunk(false));
588
589 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
590
591 let name = via.name();
592 if to.multiversioned() && name.is_none() {
593 panic!(
594 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
595 );
596 }
597
598 let raw_stream: Stream<
599 (MemberId<L>, (K, V)),
600 Process<'a, L2>,
601 Unbounded,
602 <O as MinOrder<N::OrderingGuarantee>>::Min,
603 R,
604 > = Stream::new(
605 to.clone(),
606 HydroNode::Network {
607 name: name.map(ToOwned::to_owned),
608 networking_info: N::networking_info(),
609 serialize_fn: serialize_pipeline.map(|e| e.into()),
610 instantiate_fn: DebugInstantiate::Building,
611 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
612 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
613 metadata: to.new_node_metadata(Stream::<
614 (MemberId<L>, (K, V)),
615 Cluster<'a, L2>,
616 Unbounded,
617 <O as MinOrder<N::OrderingGuarantee>>::Min,
618 R,
619 >::collection_kind()),
620 },
621 );
622
623 raw_stream
624 .map(q!(|(sender, (k, v))| ((sender, k), v)))
625 .into_keyed()
626 }
627}