1use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53use crate::properties::manual_proof;
54#[cfg(feature = "sim")]
55use crate::sim::SimSender;
56use crate::staging_util::get_this_crate;
57
58pub mod dynamic;
59
60pub mod external_process;
61pub use external_process::External;
62
63pub mod process;
64pub use process::Process;
65
66pub mod cluster;
67pub use cluster::Cluster;
68
69pub mod member_id;
70pub use member_id::{MemberId, TaglessMemberId};
71
72pub mod tick;
73pub use tick::{Atomic, Tick};
74
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
78pub enum MembershipEvent {
79 Joined,
81 Left,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub enum NetworkHint {
92 Auto,
94 TcpPort(Option<u16>),
99}
100
101pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
102 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
103}
104
105#[stageleft::export(LocationKey)]
106new_key_type! {
107 pub struct LocationKey;
109}
110
111impl std::fmt::Display for LocationKey {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "loc{:?}", self.data()) }
115}
116
117impl std::str::FromStr for LocationKey {
120 type Err = Option<ParseIntError>;
121
122 fn from_str(s: &str) -> Result<Self, Self::Err> {
123 let nvn = s.strip_prefix("loc").ok_or(None)?;
124 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
125 let idx: u64 = idx.parse()?;
126 let ver: u64 = ver.parse()?;
127 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
128 }
129}
130
131impl LocationKey {
132 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
138 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); #[cfg(test)]
142 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); }
144
145impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
147 type O = LocationKey;
148
149 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
150 where
151 Self: Sized,
152 {
153 let root = get_this_crate();
154 let n = Key::data(&self).as_ffi();
155 (
156 QuoteTokens {
157 prelude: None,
158 expr: Some(quote! {
159 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
160 }),
161 },
162 (),
163 )
164 }
165}
166
167#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
169pub enum LocationType {
170 Process,
172 Cluster,
174 External,
176}
177
178pub trait TopLevel<'a>: Location<'a> {}
180
181#[expect(
195 private_bounds,
196 reason = "only internal Hydro code can define location types"
197)]
198pub trait Location<'a>: DynLocation {
199 type Root: Location<'a>;
204
205 type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
207
208 fn root(&self) -> Self::Root;
213
214 fn drop_consistency(&self) -> Self::DropConsistency;
216 fn consistency() -> Option<ClusterConsistency>;
218
219 fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
221 L2::from_drop_consistency(self.drop_consistency())
222 }
223
224 #[doc(hidden)]
225 fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
226
227 fn try_tick(&self) -> Option<Tick<Self>> {
234 if Self::is_top_level() {
235 let id = self.flow_state().borrow_mut().next_clock_id();
236 Some(Tick {
237 id,
238 l: self.clone(),
239 })
240 } else {
241 None
242 }
243 }
244
245 fn id(&self) -> LocationId {
247 DynLocation::dyn_id(self)
248 }
249
250 fn tick(&self) -> Tick<Self> {
276 if let LocationId::Tick(_, _) = self.id() {
277 panic!("cannot create nested ticks");
278 }
279
280 let id = self.flow_state().borrow_mut().next_clock_id();
281 Tick {
282 id,
283 l: self.clone(),
284 }
285 }
286
287 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
312 where
313 Self: TopLevel<'a> + Sized,
314 {
315 Stream::new(
316 self.clone(),
317 HydroNode::Source {
318 source: HydroSource::Spin(),
319 metadata: self.new_node_metadata(Stream::<
320 (),
321 Self,
322 Unbounded,
323 TotalOrder,
324 ExactlyOnce,
325 >::collection_kind()),
326 },
327 )
328 }
329
330 fn source_stream<T, E>(
351 &self,
352 e: impl QuotedWithContext<'a, E, Self>,
353 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
354 where
355 E: FuturesStream<Item = T> + Unpin,
356 Self: TopLevel<'a> + Sized,
357 {
358 let e = e.splice_untyped_ctx(self);
359
360 let target_location = self.drop_consistency();
361 Stream::new(
362 target_location.clone(),
363 HydroNode::Source {
364 source: HydroSource::Stream(e.into()),
365 metadata: target_location.new_node_metadata(Stream::<
366 T,
367 Self::DropConsistency,
368 Unbounded,
369 TotalOrder,
370 ExactlyOnce,
371 >::collection_kind()),
372 },
373 )
374 }
375
376 fn source_iter<T, E>(
398 &self,
399 e: impl QuotedWithContext<'a, E, Self>,
400 ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
401 where
402 E: IntoIterator<Item = T>,
403 Self: Sized,
404 {
405 let e = e.splice_typed_ctx(self);
406
407 let target_location = self.drop_consistency();
408 Stream::new(
409 target_location.clone(),
410 HydroNode::Source {
411 source: HydroSource::Iter(e.into()),
412 metadata: target_location.new_node_metadata(Stream::<
413 T,
414 Self::DropConsistency,
415 Bounded,
416 TotalOrder,
417 ExactlyOnce,
418 >::collection_kind()),
419 },
420 )
421 }
422
423 #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
424 fn source_cluster_members<C: 'a>(
463 &self,
464 cluster: &Cluster<'a, C>,
465 nondet_start: NonDet,
466 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
467 where
468 Self: TopLevel<'a> + Sized,
469 {
470 self.source_cluster_membership_stream(cluster, nondet_start)
471 }
472
473 fn source_cluster_membership_stream<C: 'a>(
512 &self,
513 cluster: &Cluster<'a, C>,
514 _nondet_start: NonDet,
515 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
516 where
517 Self: TopLevel<'a> + Sized,
518 {
519 let target_consistency = self.drop_consistency();
520 Stream::new(
521 target_consistency.clone(),
522 HydroNode::Source {
523 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
524 metadata: target_consistency.new_node_metadata(Stream::<
525 (TaglessMemberId, MembershipEvent),
526 Self,
527 Unbounded,
528 TotalOrder,
529 ExactlyOnce,
530 >::collection_kind(
531 )),
532 },
533 )
534 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
535 .into_keyed()
536 }
537
538 fn source_external_bytes<L>(
546 &self,
547 from: &External<L>,
548 ) -> (
549 ExternalBytesPort,
550 Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
551 )
552 where
553 Self: TopLevel<'a> + Sized,
554 {
555 let (port, stream, sink) =
556 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
557
558 sink.complete(stream.location().source_iter(q!([])));
559
560 (port, stream)
561 }
562
563 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
570 &self,
571 from: &External<L>,
572 ) -> (
573 ExternalBincodeSink<T, NotMany, O, R>,
574 Stream<T, Self::DropConsistency, Unbounded, O, R>,
575 )
576 where
577 Self: TopLevel<'a> + Sized,
578 T: Serialize + DeserializeOwned,
579 {
580 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
581 sink.complete(stream.location().source_iter(q!([])));
582
583 (
584 ExternalBincodeSink {
585 process_key: from.key,
586 port_id: port.port_id,
587 _phantom: PhantomData,
588 },
589 stream.weaken_ordering().weaken_retries(),
590 )
591 }
592
593 #[cfg(feature = "sim")]
598 fn sim_input<T, O: Ordering, R: Retries>(
599 &self,
600 ) -> (
601 SimSender<T, O, R>,
602 Stream<T, Self::DropConsistency, Unbounded, O, R>,
603 )
604 where
605 Self: TopLevel<'a> + Sized,
606 T: Serialize + DeserializeOwned,
607 {
608 let external_location: External<'a, ()> = External {
609 key: LocationKey::FIRST,
610 flow_state: self.flow_state().clone(),
611 _phantom: PhantomData,
612 };
613
614 let (external, stream) = self.source_external_bincode(&external_location);
615
616 (SimSender(external.port_id, PhantomData), stream)
617 }
618
619 fn embedded_input<T>(
625 &self,
626 name: impl Into<String>,
627 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
628 where
629 Self: TopLevel<'a> + Sized,
630 {
631 let ident = syn::Ident::new(&name.into(), Span::call_site());
632
633 let target_location = self.drop_consistency();
634 Stream::new(
635 target_location.clone(),
636 HydroNode::Source {
637 source: HydroSource::Embedded(ident),
638 metadata: target_location.new_node_metadata(Stream::<
639 T,
640 Self,
641 Unbounded,
642 TotalOrder,
643 ExactlyOnce,
644 >::collection_kind()),
645 },
646 )
647 }
648
649 fn embedded_singleton_input<T>(
655 &self,
656 name: impl Into<String>,
657 ) -> Singleton<T, Self::DropConsistency, Bounded>
658 where
659 Self: TopLevel<'a> + Sized,
660 {
661 let ident = syn::Ident::new(&name.into(), Span::call_site());
662
663 let target_location = self.drop_consistency();
664 Singleton::new(
665 target_location.clone(),
666 HydroNode::Source {
667 source: HydroSource::EmbeddedSingleton(ident),
668 metadata: target_location
669 .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
670 },
671 )
672 }
673
674 #[expect(clippy::type_complexity, reason = "stream markers")]
719 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
720 &self,
721 from: &External<L>,
722 port_hint: NetworkHint,
723 ) -> (
724 ExternalBytesPort<NotMany>,
725 Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
726 ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
727 )
728 where
729 Self: TopLevel<'a> + Sized,
730 {
731 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
732 let target_consistency = self.drop_consistency();
733
734 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
735 T,
736 Self::DropConsistency,
737 Unbounded,
738 TotalOrder,
739 ExactlyOnce,
740 >>();
741 let mut flow_state_borrow = self.flow_state().borrow_mut();
742
743 flow_state_borrow.push_root(HydroRoot::SendExternal {
744 to_external_key: from.key,
745 to_port_id: next_external_port_id,
746 to_many: false,
747 unpaired: false,
748 serialize_fn: None,
749 instantiate_fn: DebugInstantiate::Building,
750 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
751 op_metadata: HydroIrOpMetadata::new(),
752 });
753
754 let raw_stream: Stream<
755 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
756 Self::DropConsistency,
757 Unbounded,
758 TotalOrder,
759 ExactlyOnce,
760 > = Stream::new(
761 target_consistency.clone(),
762 HydroNode::ExternalInput {
763 from_external_key: from.key,
764 from_port_id: next_external_port_id,
765 from_many: false,
766 codec_type: quote_type::<Codec>().into(),
767 port_hint,
768 instantiate_fn: DebugInstantiate::Building,
769 deserialize_fn: None,
770 metadata: target_consistency.new_node_metadata(Stream::<
771 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
772 Self::DropConsistency,
773 Unbounded,
774 TotalOrder,
775 ExactlyOnce,
776 >::collection_kind(
777 )),
778 },
779 );
780
781 (
782 ExternalBytesPort {
783 process_key: from.key,
784 port_id: next_external_port_id,
785 _phantom: PhantomData,
786 },
787 raw_stream.flatten_ordered(),
788 fwd_ref,
789 )
790 }
791
792 #[expect(clippy::type_complexity, reason = "stream markers")]
802 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
803 &self,
804 from: &External<L>,
805 ) -> (
806 ExternalBincodeBidi<InT, OutT, NotMany>,
807 Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
808 ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
809 )
810 where
811 Self: TopLevel<'a> + Sized,
812 {
813 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
814
815 let target_consistency = self.drop_consistency();
816 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
817 OutT,
818 Self::DropConsistency,
819 Unbounded,
820 TotalOrder,
821 ExactlyOnce,
822 >>();
823 let mut flow_state_borrow = self.flow_state().borrow_mut();
824
825 let root = get_this_crate();
826
827 let out_t_type = quote_type::<OutT>();
828 let ser_fn: syn::Expr = syn::parse_quote! {
829 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
830 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
831 )
832 };
833
834 flow_state_borrow.push_root(HydroRoot::SendExternal {
835 to_external_key: from.key,
836 to_port_id: next_external_port_id,
837 to_many: false,
838 unpaired: false,
839 serialize_fn: Some(ser_fn.into()),
840 instantiate_fn: DebugInstantiate::Building,
841 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
842 op_metadata: HydroIrOpMetadata::new(),
843 });
844
845 let in_t_type = quote_type::<InT>();
846
847 let deser_fn: syn::Expr = syn::parse_quote! {
848 |res| {
849 let b = res.unwrap();
850 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
851 }
852 };
853
854 let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
855 Stream::new(
856 target_consistency.clone(),
857 HydroNode::ExternalInput {
858 from_external_key: from.key,
859 from_port_id: next_external_port_id,
860 from_many: false,
861 codec_type: quote_type::<LengthDelimitedCodec>().into(),
862 port_hint: NetworkHint::Auto,
863 instantiate_fn: DebugInstantiate::Building,
864 deserialize_fn: Some(deser_fn.into()),
865 metadata: target_consistency.new_node_metadata(Stream::<
866 InT,
867 Self::DropConsistency,
868 Unbounded,
869 TotalOrder,
870 ExactlyOnce,
871 >::collection_kind(
872 )),
873 },
874 );
875
876 (
877 ExternalBincodeBidi {
878 process_key: from.key,
879 port_id: next_external_port_id,
880 _phantom: PhantomData,
881 },
882 raw_stream,
883 fwd_ref,
884 )
885 }
886
887 #[expect(clippy::type_complexity, reason = "stream markers")]
899 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
900 &self,
901 from: &External<L>,
902 port_hint: NetworkHint,
903 ) -> (
904 ExternalBytesPort<Many>,
905 KeyedStream<
906 u64,
907 <Codec as Decoder>::Item,
908 Self::DropConsistency,
909 Unbounded,
910 TotalOrder,
911 ExactlyOnce,
912 >,
913 KeyedStream<
914 u64,
915 MembershipEvent,
916 Self::DropConsistency,
917 Unbounded,
918 TotalOrder,
919 ExactlyOnce,
920 >,
921 ForwardHandle<
922 'a,
923 KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
924 >,
925 )
926 where
927 Self: TopLevel<'a> + Sized,
928 {
929 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
930
931 let target_consistency = self.drop_consistency();
932 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
933 u64,
934 T,
935 Self::DropConsistency,
936 Unbounded,
937 NoOrder,
938 ExactlyOnce,
939 >>();
940 let mut flow_state_borrow = self.flow_state().borrow_mut();
941
942 flow_state_borrow.push_root(HydroRoot::SendExternal {
943 to_external_key: from.key,
944 to_port_id: next_external_port_id,
945 to_many: true,
946 unpaired: false,
947 serialize_fn: None,
948 instantiate_fn: DebugInstantiate::Building,
949 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
950 op_metadata: HydroIrOpMetadata::new(),
951 });
952
953 let raw_stream: Stream<
954 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
955 Self::DropConsistency,
956 Unbounded,
957 TotalOrder,
958 ExactlyOnce,
959 > = Stream::new(
960 target_consistency.clone(),
961 HydroNode::ExternalInput {
962 from_external_key: from.key,
963 from_port_id: next_external_port_id,
964 from_many: true,
965 codec_type: quote_type::<Codec>().into(),
966 port_hint,
967 instantiate_fn: DebugInstantiate::Building,
968 deserialize_fn: None,
969 metadata: target_consistency.new_node_metadata(Stream::<
970 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
971 Self::DropConsistency,
972 Unbounded,
973 TotalOrder,
974 ExactlyOnce,
975 >::collection_kind(
976 )),
977 },
978 );
979
980 let membership_stream_ident = syn::Ident::new(
981 &format!(
982 "__hydro_deploy_many_{}_{}_membership",
983 from.key, next_external_port_id
984 ),
985 Span::call_site(),
986 );
987 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
988 let raw_membership_stream: KeyedStream<
989 u64,
990 bool,
991 Self::DropConsistency,
992 Unbounded,
993 TotalOrder,
994 ExactlyOnce,
995 > = KeyedStream::new(
996 target_consistency.clone(),
997 HydroNode::Source {
998 source: HydroSource::Stream(membership_stream_expr.into()),
999 metadata: target_consistency.new_node_metadata(KeyedStream::<
1000 u64,
1001 bool,
1002 Self::DropConsistency,
1003 Unbounded,
1004 TotalOrder,
1005 ExactlyOnce,
1006 >::collection_kind(
1007 )),
1008 },
1009 );
1010
1011 (
1012 ExternalBytesPort {
1013 process_key: from.key,
1014 port_id: next_external_port_id,
1015 _phantom: PhantomData,
1016 },
1017 raw_stream
1018 .flatten_ordered() .into_keyed(),
1020 raw_membership_stream.map(q!(|join| {
1021 if join {
1022 MembershipEvent::Joined
1023 } else {
1024 MembershipEvent::Left
1025 }
1026 })),
1027 fwd_ref,
1028 )
1029 }
1030
1031 #[expect(clippy::type_complexity, reason = "stream markers")]
1047 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1048 &self,
1049 from: &External<L>,
1050 ) -> (
1051 ExternalBincodeBidi<InT, OutT, Many>,
1052 KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1053 KeyedStream<
1054 u64,
1055 MembershipEvent,
1056 Self::DropConsistency,
1057 Unbounded,
1058 TotalOrder,
1059 ExactlyOnce,
1060 >,
1061 ForwardHandle<
1062 'a,
1063 KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1064 >,
1065 )
1066 where
1067 Self: TopLevel<'a> + Sized,
1068 {
1069 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1070
1071 let target_consistency = self.drop_consistency();
1072 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1073 u64,
1074 OutT,
1075 Self::DropConsistency,
1076 Unbounded,
1077 NoOrder,
1078 ExactlyOnce,
1079 >>();
1080 let mut flow_state_borrow = self.flow_state().borrow_mut();
1081
1082 let root = get_this_crate();
1083
1084 let out_t_type = quote_type::<OutT>();
1085 let ser_fn: syn::Expr = syn::parse_quote! {
1086 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1087 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1088 )
1089 };
1090
1091 flow_state_borrow.push_root(HydroRoot::SendExternal {
1092 to_external_key: from.key,
1093 to_port_id: next_external_port_id,
1094 to_many: true,
1095 unpaired: false,
1096 serialize_fn: Some(ser_fn.into()),
1097 instantiate_fn: DebugInstantiate::Building,
1098 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1099 op_metadata: HydroIrOpMetadata::new(),
1100 });
1101
1102 let in_t_type = quote_type::<InT>();
1103
1104 let deser_fn: syn::Expr = syn::parse_quote! {
1105 |res| {
1106 let (id, b) = res.unwrap();
1107 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1108 }
1109 };
1110
1111 let raw_stream: KeyedStream<
1112 u64,
1113 InT,
1114 Self::DropConsistency,
1115 Unbounded,
1116 TotalOrder,
1117 ExactlyOnce,
1118 > = KeyedStream::new(
1119 target_consistency.clone(),
1120 HydroNode::ExternalInput {
1121 from_external_key: from.key,
1122 from_port_id: next_external_port_id,
1123 from_many: true,
1124 codec_type: quote_type::<LengthDelimitedCodec>().into(),
1125 port_hint: NetworkHint::Auto,
1126 instantiate_fn: DebugInstantiate::Building,
1127 deserialize_fn: Some(deser_fn.into()),
1128 metadata: target_consistency.new_node_metadata(KeyedStream::<
1129 u64,
1130 InT,
1131 Self::DropConsistency,
1132 Unbounded,
1133 TotalOrder,
1134 ExactlyOnce,
1135 >::collection_kind(
1136 )),
1137 },
1138 );
1139
1140 let membership_stream_ident = syn::Ident::new(
1141 &format!(
1142 "__hydro_deploy_many_{}_{}_membership",
1143 from.key, next_external_port_id
1144 ),
1145 Span::call_site(),
1146 );
1147 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1148 let raw_membership_stream: KeyedStream<
1149 u64,
1150 bool,
1151 Self::DropConsistency,
1152 Unbounded,
1153 TotalOrder,
1154 ExactlyOnce,
1155 > = KeyedStream::new(
1156 target_consistency.clone(),
1157 HydroNode::Source {
1158 source: HydroSource::Stream(membership_stream_expr.into()),
1159 metadata: target_consistency.new_node_metadata(KeyedStream::<
1160 u64,
1161 bool,
1162 Self::DropConsistency,
1163 Unbounded,
1164 TotalOrder,
1165 ExactlyOnce,
1166 >::collection_kind(
1167 )),
1168 },
1169 );
1170
1171 (
1172 ExternalBincodeBidi {
1173 process_key: from.key,
1174 port_id: next_external_port_id,
1175 _phantom: PhantomData,
1176 },
1177 raw_stream,
1178 raw_membership_stream.map(q!(|join| {
1179 if join {
1180 MembershipEvent::Joined
1181 } else {
1182 MembershipEvent::Left
1183 }
1184 })),
1185 fwd_ref,
1186 )
1187 }
1188
1189 fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1242 &self,
1243 sidecar: impl QuotedWithContext<'a, F, Self>,
1244 ) -> (
1245 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1246 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1247 )
1248 where
1249 Self: Sized + TopLevel<'a>,
1250 {
1251 let location_key = Location::id(self).key();
1252
1253 let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1254 let (stream_ident, sink_ident) = sidecar_id.idents();
1255
1256 let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1257 self.flow_state()
1258 .borrow_mut()
1259 .sidecars
1260 .push(crate::compile::builder::Sidecar::Bidi {
1261 location_key,
1262 sidecar_id,
1263 sidecar_closure: Box::new(sidecar_closure),
1264 });
1265
1266 let source_expr: syn::Expr = parse_quote! {
1268 #stream_ident
1269 };
1270 let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1271 self.clone(),
1272 HydroNode::Source {
1273 source: HydroSource::Stream(source_expr.into()),
1274 metadata: self.new_node_metadata(Stream::<
1275 InT,
1276 Self,
1277 Unbounded, TotalOrder, ExactlyOnce,
1280 >::collection_kind()),
1281 },
1282 );
1283
1284 let (fwd_ref, to_sink): (
1286 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1287 Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1288 ) = self.forward_ref();
1289
1290 let sink_expr: syn::Expr = parse_quote! {
1291 #sink_ident
1292 };
1293
1294 let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1295 self.flow_state()
1296 .borrow_mut()
1297 .try_push_root(HydroRoot::DestSink {
1298 sink: sink_expr.into(),
1299 input: Box::new(sink_input_ir),
1300 op_metadata: HydroIrOpMetadata::new(),
1301 });
1302
1303 (inbound, fwd_ref)
1304 }
1305
1306 fn singleton<T>(
1326 &self,
1327 e: impl QuotedWithContext<'a, T, Self>,
1328 ) -> Singleton<T, Self::DropConsistency, Bounded>
1329 where
1330 Self: Sized,
1331 {
1332 let e = e.splice_untyped_ctx(self);
1333
1334 let target_location = self.drop_consistency();
1335 Singleton::new(
1336 target_location.clone(),
1337 HydroNode::SingletonSource {
1338 value: e.into(),
1339 first_tick_only: false,
1340 metadata: target_location.new_node_metadata(Singleton::<
1341 T,
1342 Self::DropConsistency,
1343 Bounded,
1344 >::collection_kind()),
1345 },
1346 )
1347 }
1348
1349 fn singleton_future<F>(
1372 &self,
1373 e: impl QuotedWithContext<'a, F, Self>,
1374 ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1375 where
1376 F: Future,
1377 Self: Sized,
1378 {
1379 self.singleton(e).resolve_future_blocking()
1380 }
1381
1382 fn source_interval(
1391 &self,
1392 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1393 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1394 where
1395 Self: TopLevel<'a> + Sized,
1396 {
1397 self.source_stream(q!(tokio_stream::StreamExt::map(
1398 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1399 |_| ()
1400 )))
1401 .assert_has_consistency_of_trusted(
1402 manual_proof!(),
1403 )
1404 }
1405
1406 fn source_interval_delayed(
1413 &self,
1414 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1415 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1416 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1417 where
1418 Self: TopLevel<'a> + Sized,
1419 {
1420 self.source_stream(q!(tokio_stream::StreamExt::map(
1421 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1422 tokio::time::Instant::now() + delay,
1423 interval,
1424 )),
1425 |_| ()
1426 )))
1427 .assert_has_consistency_of_trusted(
1428 manual_proof!(),
1429 )
1430 }
1431
1432 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1472 where
1473 S: CycleCollection<'a, ForwardRef, Location = Self>,
1474 {
1475 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1476 (
1477 ForwardHandle::new(cycle_id, Location::id(self)),
1478 S::create_source(cycle_id, self.clone()),
1479 )
1480 }
1481}
1482
1483#[cfg(feature = "deploy")]
1484#[cfg(test)]
1485mod tests {
1486 use std::collections::HashSet;
1487
1488 use futures::{SinkExt, StreamExt};
1489 use hydro_deploy::Deployment;
1490 use stageleft::q;
1491 use tokio_util::codec::LengthDelimitedCodec;
1492
1493 use crate::compile::builder::FlowBuilder;
1494 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1495 use crate::location::{Location, NetworkHint};
1496 use crate::nondet::nondet;
1497
1498 #[tokio::test]
1499 async fn top_level_singleton_replay_cardinality() {
1500 let mut deployment = Deployment::new();
1501
1502 let mut flow = FlowBuilder::new();
1503 let node = flow.process::<()>();
1504 let external = flow.external::<()>();
1505
1506 let (in_port, input) =
1507 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1508 let singleton = node.singleton(q!(123));
1509 let tick = node.tick();
1510 let out = input
1511 .batch(&tick, nondet!())
1512 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1513 .cross_singleton(
1514 singleton
1515 .snapshot(&tick, nondet!())
1516 .into_stream()
1517 .count(),
1518 )
1519 .all_ticks()
1520 .send_bincode_external(&external);
1521
1522 let nodes = flow
1523 .with_process(&node, deployment.Localhost())
1524 .with_external(&external, deployment.Localhost())
1525 .deploy(&mut deployment);
1526
1527 deployment.deploy().await.unwrap();
1528
1529 let mut external_in = nodes.connect(in_port).await;
1530 let mut external_out = nodes.connect(out).await;
1531
1532 deployment.start().await.unwrap();
1533
1534 external_in.send(1).await.unwrap();
1535 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1536
1537 external_in.send(2).await.unwrap();
1538 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1539 }
1540
1541 #[tokio::test]
1542 async fn tick_singleton_replay_cardinality() {
1543 let mut deployment = Deployment::new();
1544
1545 let mut flow = FlowBuilder::new();
1546 let node = flow.process::<()>();
1547 let external = flow.external::<()>();
1548
1549 let (in_port, input) =
1550 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1551 let tick = node.tick();
1552 let singleton = tick.singleton(q!(123));
1553 let out = input
1554 .batch(&tick, nondet!())
1555 .cross_singleton(singleton.clone())
1556 .cross_singleton(singleton.into_stream().count())
1557 .all_ticks()
1558 .send_bincode_external(&external);
1559
1560 let nodes = flow
1561 .with_process(&node, deployment.Localhost())
1562 .with_external(&external, deployment.Localhost())
1563 .deploy(&mut deployment);
1564
1565 deployment.deploy().await.unwrap();
1566
1567 let mut external_in = nodes.connect(in_port).await;
1568 let mut external_out = nodes.connect(out).await;
1569
1570 deployment.start().await.unwrap();
1571
1572 external_in.send(1).await.unwrap();
1573 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1574
1575 external_in.send(2).await.unwrap();
1576 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1577 }
1578
1579 #[tokio::test]
1580 async fn external_bytes() {
1581 let mut deployment = Deployment::new();
1582
1583 let mut flow = FlowBuilder::new();
1584 let first_node = flow.process::<()>();
1585 let external = flow.external::<()>();
1586
1587 let (in_port, input) = first_node.source_external_bytes(&external);
1588 let out = input.send_bincode_external(&external);
1589
1590 let nodes = flow
1591 .with_process(&first_node, deployment.Localhost())
1592 .with_external(&external, deployment.Localhost())
1593 .deploy(&mut deployment);
1594
1595 deployment.deploy().await.unwrap();
1596
1597 let mut external_in = nodes.connect(in_port).await.1;
1598 let mut external_out = nodes.connect(out).await;
1599
1600 deployment.start().await.unwrap();
1601
1602 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1603
1604 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1605 }
1606
1607 #[tokio::test]
1608 async fn multi_external_source() {
1609 let mut deployment = Deployment::new();
1610
1611 let mut flow = FlowBuilder::new();
1612 let first_node = flow.process::<()>();
1613 let external = flow.external::<()>();
1614
1615 let (in_port, input, _membership, complete_sink) =
1616 first_node.bidi_external_many_bincode(&external);
1617 let out = input.entries().send_bincode_external(&external);
1618 complete_sink.complete(
1619 first_node
1620 .source_iter::<(u64, ()), _>(q!([]))
1621 .into_keyed()
1622 .weaken_ordering(),
1623 );
1624
1625 let nodes = flow
1626 .with_process(&first_node, deployment.Localhost())
1627 .with_external(&external, deployment.Localhost())
1628 .deploy(&mut deployment);
1629
1630 deployment.deploy().await.unwrap();
1631
1632 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1633 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1634 let external_out = nodes.connect(out).await;
1635
1636 deployment.start().await.unwrap();
1637
1638 external_in_1.send(123).await.unwrap();
1639 external_in_2.send(456).await.unwrap();
1640
1641 assert_eq!(
1642 external_out.take(2).collect::<HashSet<_>>().await,
1643 vec![(0, 123), (1, 456)].into_iter().collect()
1644 );
1645 }
1646
1647 #[tokio::test]
1648 async fn second_connection_only_multi_source() {
1649 let mut deployment = Deployment::new();
1650
1651 let mut flow = FlowBuilder::new();
1652 let first_node = flow.process::<()>();
1653 let external = flow.external::<()>();
1654
1655 let (in_port, input, _membership, complete_sink) =
1656 first_node.bidi_external_many_bincode(&external);
1657 let out = input.entries().send_bincode_external(&external);
1658 complete_sink.complete(
1659 first_node
1660 .source_iter::<(u64, ()), _>(q!([]))
1661 .into_keyed()
1662 .weaken_ordering(),
1663 );
1664
1665 let nodes = flow
1666 .with_process(&first_node, deployment.Localhost())
1667 .with_external(&external, deployment.Localhost())
1668 .deploy(&mut deployment);
1669
1670 deployment.deploy().await.unwrap();
1671
1672 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1674 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1675 let mut external_out = nodes.connect(out).await;
1676
1677 deployment.start().await.unwrap();
1678
1679 external_in_2.send(456).await.unwrap();
1680
1681 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1682 }
1683
1684 #[tokio::test]
1685 async fn multi_external_bytes() {
1686 let mut deployment = Deployment::new();
1687
1688 let mut flow = FlowBuilder::new();
1689 let first_node = flow.process::<()>();
1690 let external = flow.external::<()>();
1691
1692 let (in_port, input, _membership, complete_sink) = first_node
1693 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1694 let out = input.entries().send_bincode_external(&external);
1695 complete_sink.complete(
1696 first_node
1697 .source_iter(q!([]))
1698 .into_keyed()
1699 .weaken_ordering(),
1700 );
1701
1702 let nodes = flow
1703 .with_process(&first_node, deployment.Localhost())
1704 .with_external(&external, deployment.Localhost())
1705 .deploy(&mut deployment);
1706
1707 deployment.deploy().await.unwrap();
1708
1709 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1710 let mut external_in_2 = nodes.connect(in_port).await.1;
1711 let external_out = nodes.connect(out).await;
1712
1713 deployment.start().await.unwrap();
1714
1715 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1716 external_in_2.send(vec![4, 5].into()).await.unwrap();
1717
1718 assert_eq!(
1719 external_out.take(2).collect::<HashSet<_>>().await,
1720 vec![
1721 (0, (&[1u8, 2, 3] as &[u8]).into()),
1722 (1, (&[4u8, 5] as &[u8]).into())
1723 ]
1724 .into_iter()
1725 .collect()
1726 );
1727 }
1728
1729 #[tokio::test]
1730 async fn single_client_external_bytes() {
1731 let mut deployment = Deployment::new();
1732 let mut flow = FlowBuilder::new();
1733 let first_node = flow.process::<()>();
1734 let external = flow.external::<()>();
1735 let (port, input, complete_sink) = first_node
1736 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1737 complete_sink.complete(input.map(q!(|data| {
1738 let mut resp: Vec<u8> = data.into();
1739 resp.push(42);
1740 resp.into() })));
1742
1743 let nodes = flow
1744 .with_process(&first_node, deployment.Localhost())
1745 .with_external(&external, deployment.Localhost())
1746 .deploy(&mut deployment);
1747
1748 deployment.deploy().await.unwrap();
1749 deployment.start().await.unwrap();
1750
1751 let (mut external_out, mut external_in) = nodes.connect(port).await;
1752
1753 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1754 assert_eq!(
1755 external_out.next().await.unwrap().unwrap(),
1756 vec![1, 2, 3, 42]
1757 );
1758 }
1759
1760 #[tokio::test]
1761 async fn echo_external_bytes() {
1762 let mut deployment = Deployment::new();
1763
1764 let mut flow = FlowBuilder::new();
1765 let first_node = flow.process::<()>();
1766 let external = flow.external::<()>();
1767
1768 let (port, input, _membership, complete_sink) = first_node
1769 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1770 complete_sink
1771 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1772
1773 let nodes = flow
1774 .with_process(&first_node, deployment.Localhost())
1775 .with_external(&external, deployment.Localhost())
1776 .deploy(&mut deployment);
1777
1778 deployment.deploy().await.unwrap();
1779
1780 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1781 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1782
1783 deployment.start().await.unwrap();
1784
1785 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1786 external_in_2.send(vec![4, 5].into()).await.unwrap();
1787
1788 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1789 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1790 }
1791
1792 #[tokio::test]
1793 async fn echo_external_bincode() {
1794 let mut deployment = Deployment::new();
1795
1796 let mut flow = FlowBuilder::new();
1797 let first_node = flow.process::<()>();
1798 let external = flow.external::<()>();
1799
1800 let (port, input, _membership, complete_sink) =
1801 first_node.bidi_external_many_bincode(&external);
1802 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1803
1804 let nodes = flow
1805 .with_process(&first_node, deployment.Localhost())
1806 .with_external(&external, deployment.Localhost())
1807 .deploy(&mut deployment);
1808
1809 deployment.deploy().await.unwrap();
1810
1811 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1812 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1813
1814 deployment.start().await.unwrap();
1815
1816 external_in_1.send("hi".to_owned()).await.unwrap();
1817 external_in_2.send("hello".to_owned()).await.unwrap();
1818
1819 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1820 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1821 }
1822
1823 #[tokio::test]
1824 async fn closure_location_name() {
1825 let mut deployment = Deployment::new();
1826 let mut flow = FlowBuilder::new();
1827
1828 enum ClosureProcess {}
1829
1830 let node = flow.process::<ClosureProcess>();
1831 let external = flow.external::<()>();
1832
1833 let (in_port, input) =
1834 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1835 let out = input.send_bincode_external(&external);
1836
1837 let nodes = flow
1838 .with_process(&node, deployment.Localhost())
1839 .with_external(&external, deployment.Localhost())
1840 .deploy(&mut deployment);
1841
1842 deployment.deploy().await.unwrap();
1843
1844 let mut external_in = nodes.connect(in_port).await;
1845 let mut external_out = nodes.connect(out).await;
1846
1847 deployment.start().await.unwrap();
1848
1849 external_in.send(42).await.unwrap();
1850 assert_eq!(external_out.next().await.unwrap(), 42);
1851 }
1852}