1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
158 Self {
159 expr: self.expr.clone(),
160 singleton_refs: self
161 .singleton_refs
162 .iter()
163 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
164 .collect(),
165 }
166 }
167
168 pub fn transform_children(
169 &mut self,
170 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
171 seen_tees: &mut SeenSharedNodes,
172 ) {
173 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
174 transform(ref_node, seen_tees);
175 }
176 }
177
178 #[cfg(feature = "build")]
181 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
182 if self.singleton_refs.is_empty() {
183 self.expr.0.to_token_stream()
184 } else {
185 assert!(
186 ident_stack.len() >= self.singleton_refs.len(),
187 "ident_stack has {} entries but expected at least {} for singleton_refs",
188 ident_stack.len(),
189 self.singleton_refs.len()
190 );
191 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
192
193 let mut let_bindings = Vec::new();
194 for ((i, (ref_node, is_mut)), ref_ident) in
195 self.singleton_refs.iter().enumerate().zip(ref_idents)
196 {
197 let HydroNode::Reference { access_counter, .. } = ref_node else {
198 panic!("ClosureExpression expected references to `HydroNode::Reference`");
199 };
200 let group = access_counter.frozen_group();
201 let local_ident = handoff_ref_ident(i);
203 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
204 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
205 let mut_token = is_mut.then(|| quote!(mut));
206 let binding = quote! {
207 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
208 };
209 let_bindings.push(binding);
210 }
211
212 let expr = &self.expr.0;
213 quote! {
214 {
215 #( #let_bindings )*
216 #expr
217 }
218 }
219 }
220 }
221}
222
223#[derive(Clone, Hash)]
227pub struct DebugExpr(pub Box<syn::Expr>);
228
229impl serde::Serialize for DebugExpr {
230 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
231 serializer.serialize_str(&self.to_string())
232 }
233}
234
235impl From<syn::Expr> for DebugExpr {
236 fn from(expr: syn::Expr) -> Self {
237 Self(Box::new(expr))
238 }
239}
240
241impl Deref for DebugExpr {
242 type Target = syn::Expr;
243
244 fn deref(&self) -> &Self::Target {
245 &self.0
246 }
247}
248
249impl ToTokens for DebugExpr {
250 fn to_tokens(&self, tokens: &mut TokenStream) {
251 self.0.to_tokens(tokens);
252 }
253}
254
255impl Debug for DebugExpr {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 write!(f, "{}", self.0.to_token_stream())
258 }
259}
260
261impl Display for DebugExpr {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 let original = self.0.as_ref().clone();
264 let simplified = simplify_q_macro(original);
265
266 write!(f, "q!({})", quote::quote!(#simplified))
269 }
270}
271
272fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
274 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
275 && is_stageleft_runtime_support_call(&path_expr.path)
277 && let syn::Expr::Block(b) = &call.args[0]
278 && b.block.stmts.len() == 3
279 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
280 {
282 let mut e = e.clone();
283 while let syn::Expr::Block(ref mut block) = e
284 && block.block.stmts.len() == 1
285 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
286 {
287 e = inner_e;
288 }
289
290 e
291 } else {
292 expr
293 }
294}
295
296fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
297 if let Some(last_segment) = path.segments.last() {
299 let fn_name = last_segment.ident.to_string();
300 path.segments.len() > 2
301 && path.segments[0].ident == "stageleft"
302 && path.segments[1].ident == "runtime_support"
303 && fn_name.contains("_type_hint")
304 } else {
305 false
306 }
307}
308
309#[derive(Clone, PartialEq, Eq, Hash)]
313pub struct DebugType(pub Box<syn::Type>);
314
315impl From<syn::Type> for DebugType {
316 fn from(t: syn::Type) -> Self {
317 Self(Box::new(t))
318 }
319}
320
321impl Deref for DebugType {
322 type Target = syn::Type;
323
324 fn deref(&self) -> &Self::Target {
325 &self.0
326 }
327}
328
329impl ToTokens for DebugType {
330 fn to_tokens(&self, tokens: &mut TokenStream) {
331 self.0.to_tokens(tokens);
332 }
333}
334
335impl Debug for DebugType {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 write!(f, "{}", self.0.to_token_stream())
338 }
339}
340
341impl serde::Serialize for DebugType {
342 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
343 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
344 }
345}
346
347fn serialize_backtrace_as_span<S: serde::Serializer>(
348 backtrace: &Backtrace,
349 serializer: S,
350) -> Result<S::Ok, S::Error> {
351 match backtrace.format_span() {
352 Some(span) => serializer.serialize_some(&span),
353 None => serializer.serialize_none(),
354 }
355}
356
357fn serialize_ident<S: serde::Serializer>(
358 ident: &syn::Ident,
359 serializer: S,
360) -> Result<S::Ok, S::Error> {
361 serializer.serialize_str(&ident.to_string())
362}
363
364pub enum DebugInstantiate {
365 Building,
366 Finalized(Box<DebugInstantiateFinalized>),
367}
368
369impl serde::Serialize for DebugInstantiate {
370 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
371 match self {
372 DebugInstantiate::Building => {
373 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
374 }
375 DebugInstantiate::Finalized(_) => {
376 panic!(
377 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
378 )
379 }
380 }
381 }
382}
383
384#[cfg_attr(
385 not(feature = "build"),
386 expect(
387 dead_code,
388 reason = "sink, source unused without `feature = \"build\"`."
389 )
390)]
391pub struct DebugInstantiateFinalized {
392 sink: syn::Expr,
393 source: syn::Expr,
394 connect_fn: Option<Box<dyn FnOnce()>>,
395}
396
397impl From<DebugInstantiateFinalized> for DebugInstantiate {
398 fn from(f: DebugInstantiateFinalized) -> Self {
399 Self::Finalized(Box::new(f))
400 }
401}
402
403impl Debug for DebugInstantiate {
404 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405 write!(f, "<network instantiate>")
406 }
407}
408
409impl Hash for DebugInstantiate {
410 fn hash<H: Hasher>(&self, _state: &mut H) {
411 }
413}
414
415impl Clone for DebugInstantiate {
416 fn clone(&self) -> Self {
417 match self {
418 DebugInstantiate::Building => DebugInstantiate::Building,
419 DebugInstantiate::Finalized(_) => {
420 panic!("DebugInstantiate::Finalized should not be cloned")
421 }
422 }
423 }
424}
425
426#[derive(Debug, Hash, Clone, serde::Serialize)]
435pub enum ClusterMembersState {
436 Uninit,
438 Stream(DebugExpr),
441 Tee(LocationId, LocationId),
445}
446
447#[derive(Debug, Hash, Clone, serde::Serialize)]
449pub enum HydroSource {
450 Stream(DebugExpr),
451 ExternalNetwork(),
452 Iter(DebugExpr),
453 Spin(),
454 ClusterMembers(LocationId, ClusterMembersState),
455 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
456 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
457}
458
459#[cfg(feature = "build")]
460pub trait DfirBuilder {
466 fn singleton_intermediates(&self) -> bool;
468
469 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
471
472 #[expect(clippy::too_many_arguments, reason = "TODO")]
473 fn batch(
474 &mut self,
475 in_ident: syn::Ident,
476 in_location: &LocationId,
477 in_kind: &CollectionKind,
478 out_ident: &syn::Ident,
479 out_location: &LocationId,
480 op_meta: &HydroIrOpMetadata,
481 fold_hooked_idents: &HashSet<String>,
482 );
483 fn yield_from_tick(
484 &mut self,
485 in_ident: syn::Ident,
486 in_location: &LocationId,
487 in_kind: &CollectionKind,
488 out_ident: &syn::Ident,
489 out_location: &LocationId,
490 );
491
492 fn begin_atomic(
493 &mut self,
494 in_ident: syn::Ident,
495 in_location: &LocationId,
496 in_kind: &CollectionKind,
497 out_ident: &syn::Ident,
498 out_location: &LocationId,
499 op_meta: &HydroIrOpMetadata,
500 );
501 fn end_atomic(
502 &mut self,
503 in_ident: syn::Ident,
504 in_location: &LocationId,
505 in_kind: &CollectionKind,
506 out_ident: &syn::Ident,
507 );
508
509 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
510 fn observe_nondet(
511 &mut self,
512 trusted: bool,
513 location: &LocationId,
514 in_ident: syn::Ident,
515 in_kind: &CollectionKind,
516 out_ident: &syn::Ident,
517 out_kind: &CollectionKind,
518 op_meta: &HydroIrOpMetadata,
519 );
520
521 #[expect(clippy::too_many_arguments, reason = "TODO")]
522 fn merge_ordered(
523 &mut self,
524 location: &LocationId,
525 first_ident: syn::Ident,
526 second_ident: syn::Ident,
527 out_ident: &syn::Ident,
528 in_kind: &CollectionKind,
529 op_meta: &HydroIrOpMetadata,
530 operator_tag: Option<&str>,
531 );
532
533 #[expect(clippy::too_many_arguments, reason = "TODO")]
534 fn create_network(
535 &mut self,
536 from: &LocationId,
537 to: &LocationId,
538 input_ident: syn::Ident,
539 out_ident: &syn::Ident,
540 serialize: Option<&DebugExpr>,
541 sink: syn::Expr,
542 source: syn::Expr,
543 deserialize: Option<&DebugExpr>,
544 tag_id: StmtId,
545 networking_info: &crate::networking::NetworkingInfo,
546 );
547
548 fn create_external_source(
549 &mut self,
550 on: &LocationId,
551 source_expr: syn::Expr,
552 out_ident: &syn::Ident,
553 deserialize: Option<&DebugExpr>,
554 tag_id: StmtId,
555 );
556
557 fn create_external_output(
558 &mut self,
559 on: &LocationId,
560 sink_expr: syn::Expr,
561 input_ident: &syn::Ident,
562 serialize: Option<&DebugExpr>,
563 tag_id: StmtId,
564 );
565
566 fn emit_fold_hook(
569 &mut self,
570 location: &LocationId,
571 in_ident: &syn::Ident,
572 in_kind: &CollectionKind,
573 op_meta: &HydroIrOpMetadata,
574 ) -> Option<syn::Ident>;
575
576 fn assert_is_consistent(
580 &mut self,
581 trusted: bool,
582 location: &LocationId,
583 in_ident: syn::Ident,
584 out_ident: &syn::Ident,
585 );
586}
587
588#[cfg(feature = "build")]
589impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
590 fn singleton_intermediates(&self) -> bool {
591 false
592 }
593
594 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
595 self.entry(location.root().key())
596 .expect("location was removed")
597 .or_default()
598 }
599
600 fn batch(
601 &mut self,
602 in_ident: syn::Ident,
603 in_location: &LocationId,
604 in_kind: &CollectionKind,
605 out_ident: &syn::Ident,
606 _out_location: &LocationId,
607 _op_meta: &HydroIrOpMetadata,
608 _fold_hooked_idents: &HashSet<String>,
609 ) {
610 let builder = self.get_dfir_mut(in_location.root());
611 if in_kind.is_bounded()
612 && matches!(
613 in_kind,
614 CollectionKind::Singleton { .. }
615 | CollectionKind::Optional { .. }
616 | CollectionKind::KeyedSingleton { .. }
617 )
618 {
619 assert!(in_location.is_top_level());
620 builder.add_dfir(
621 parse_quote! {
622 #out_ident = #in_ident -> persist::<'static>();
623 },
624 None,
625 None,
626 );
627 } else {
628 builder.add_dfir(
629 parse_quote! {
630 #out_ident = #in_ident;
631 },
632 None,
633 None,
634 );
635 }
636 }
637
638 fn yield_from_tick(
639 &mut self,
640 in_ident: syn::Ident,
641 in_location: &LocationId,
642 _in_kind: &CollectionKind,
643 out_ident: &syn::Ident,
644 _out_location: &LocationId,
645 ) {
646 let builder = self.get_dfir_mut(in_location.root());
647 builder.add_dfir(
648 parse_quote! {
649 #out_ident = #in_ident;
650 },
651 None,
652 None,
653 );
654 }
655
656 fn begin_atomic(
657 &mut self,
658 in_ident: syn::Ident,
659 in_location: &LocationId,
660 _in_kind: &CollectionKind,
661 out_ident: &syn::Ident,
662 _out_location: &LocationId,
663 _op_meta: &HydroIrOpMetadata,
664 ) {
665 let builder = self.get_dfir_mut(in_location.root());
666 builder.add_dfir(
667 parse_quote! {
668 #out_ident = #in_ident;
669 },
670 None,
671 None,
672 );
673 }
674
675 fn end_atomic(
676 &mut self,
677 in_ident: syn::Ident,
678 in_location: &LocationId,
679 _in_kind: &CollectionKind,
680 out_ident: &syn::Ident,
681 ) {
682 let builder = self.get_dfir_mut(in_location.root());
683 builder.add_dfir(
684 parse_quote! {
685 #out_ident = #in_ident;
686 },
687 None,
688 None,
689 );
690 }
691
692 fn observe_nondet(
693 &mut self,
694 _trusted: bool,
695 location: &LocationId,
696 in_ident: syn::Ident,
697 _in_kind: &CollectionKind,
698 out_ident: &syn::Ident,
699 _out_kind: &CollectionKind,
700 _op_meta: &HydroIrOpMetadata,
701 ) {
702 let builder = self.get_dfir_mut(location);
703 builder.add_dfir(
704 parse_quote! {
705 #out_ident = #in_ident;
706 },
707 None,
708 None,
709 );
710 }
711
712 fn merge_ordered(
713 &mut self,
714 location: &LocationId,
715 first_ident: syn::Ident,
716 second_ident: syn::Ident,
717 out_ident: &syn::Ident,
718 _in_kind: &CollectionKind,
719 _op_meta: &HydroIrOpMetadata,
720 operator_tag: Option<&str>,
721 ) {
722 let builder = self.get_dfir_mut(location);
723 builder.add_dfir(
724 parse_quote! {
725 #out_ident = union();
726 #first_ident -> [0]#out_ident;
727 #second_ident -> [1]#out_ident;
728 },
729 None,
730 operator_tag,
731 );
732 }
733
734 fn create_network(
735 &mut self,
736 from: &LocationId,
737 to: &LocationId,
738 input_ident: syn::Ident,
739 out_ident: &syn::Ident,
740 serialize: Option<&DebugExpr>,
741 sink: syn::Expr,
742 source: syn::Expr,
743 deserialize: Option<&DebugExpr>,
744 tag_id: StmtId,
745 _networking_info: &crate::networking::NetworkingInfo,
746 ) {
747 let sender_builder = self.get_dfir_mut(from);
748 if let Some(serialize_pipeline) = serialize {
749 sender_builder.add_dfir(
750 parse_quote! {
751 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
752 },
753 None,
754 Some(&format!("send{}", tag_id)),
756 );
757 } else {
758 sender_builder.add_dfir(
759 parse_quote! {
760 #input_ident -> dest_sink(#sink);
761 },
762 None,
763 Some(&format!("send{}", tag_id)),
764 );
765 }
766
767 let receiver_builder = self.get_dfir_mut(to);
768 if let Some(deserialize_pipeline) = deserialize {
769 receiver_builder.add_dfir(
770 parse_quote! {
771 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
772 },
773 None,
774 Some(&format!("recv{}", tag_id)),
775 );
776 } else {
777 receiver_builder.add_dfir(
778 parse_quote! {
779 #out_ident = source_stream(#source);
780 },
781 None,
782 Some(&format!("recv{}", tag_id)),
783 );
784 }
785 }
786
787 fn create_external_source(
788 &mut self,
789 on: &LocationId,
790 source_expr: syn::Expr,
791 out_ident: &syn::Ident,
792 deserialize: Option<&DebugExpr>,
793 tag_id: StmtId,
794 ) {
795 let receiver_builder = self.get_dfir_mut(on);
796 if let Some(deserialize_pipeline) = deserialize {
797 receiver_builder.add_dfir(
798 parse_quote! {
799 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
800 },
801 None,
802 Some(&format!("recv{}", tag_id)),
803 );
804 } else {
805 receiver_builder.add_dfir(
806 parse_quote! {
807 #out_ident = source_stream(#source_expr);
808 },
809 None,
810 Some(&format!("recv{}", tag_id)),
811 );
812 }
813 }
814
815 fn create_external_output(
816 &mut self,
817 on: &LocationId,
818 sink_expr: syn::Expr,
819 input_ident: &syn::Ident,
820 serialize: Option<&DebugExpr>,
821 tag_id: StmtId,
822 ) {
823 let sender_builder = self.get_dfir_mut(on);
824 if let Some(serialize_fn) = serialize {
825 sender_builder.add_dfir(
826 parse_quote! {
827 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
828 },
829 None,
830 Some(&format!("send{}", tag_id)),
832 );
833 } else {
834 sender_builder.add_dfir(
835 parse_quote! {
836 #input_ident -> dest_sink(#sink_expr);
837 },
838 None,
839 Some(&format!("send{}", tag_id)),
840 );
841 }
842 }
843
844 fn emit_fold_hook(
845 &mut self,
846 _location: &LocationId,
847 _in_ident: &syn::Ident,
848 _in_kind: &CollectionKind,
849 _op_meta: &HydroIrOpMetadata,
850 ) -> Option<syn::Ident> {
851 None
852 }
853
854 fn assert_is_consistent(
855 &mut self,
856 _trusted: bool,
857 location: &LocationId,
858 in_ident: syn::Ident,
859 out_ident: &syn::Ident,
860 ) {
861 let builder = self.get_dfir_mut(location);
862 builder.add_dfir(
863 parse_quote! {
864 #out_ident = #in_ident;
865 },
866 None,
867 None,
868 );
869 }
870}
871
872#[cfg(feature = "build")]
873pub enum BuildersOrCallback<'a, L, N>
874where
875 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
876 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
877{
878 Builders(&'a mut dyn DfirBuilder),
879 Callback(L, N),
880}
881
882#[derive(Debug, Hash, serde::Serialize)]
886pub enum HydroRoot {
887 ForEach {
888 f: ClosureExpr,
889 input: Box<HydroNode>,
890 op_metadata: HydroIrOpMetadata,
891 },
892 SendExternal {
893 to_external_key: LocationKey,
894 to_port_id: ExternalPortId,
895 to_many: bool,
896 unpaired: bool,
897 serialize_fn: Option<DebugExpr>,
898 instantiate_fn: DebugInstantiate,
899 input: Box<HydroNode>,
900 op_metadata: HydroIrOpMetadata,
901 },
902 DestSink {
903 sink: DebugExpr,
904 input: Box<HydroNode>,
905 op_metadata: HydroIrOpMetadata,
906 },
907 CycleSink {
908 cycle_id: CycleId,
909 input: Box<HydroNode>,
910 op_metadata: HydroIrOpMetadata,
911 },
912 EmbeddedOutput {
913 #[serde(serialize_with = "serialize_ident")]
914 ident: syn::Ident,
915 input: Box<HydroNode>,
916 op_metadata: HydroIrOpMetadata,
917 },
918 Null {
919 input: Box<HydroNode>,
920 op_metadata: HydroIrOpMetadata,
921 },
922}
923
924impl HydroRoot {
925 #[cfg(feature = "build")]
926 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
927 pub fn compile_network<'a, D>(
928 &mut self,
929 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
930 seen_tees: &mut SeenSharedNodes,
931 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
932 processes: &SparseSecondaryMap<LocationKey, D::Process>,
933 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
934 externals: &SparseSecondaryMap<LocationKey, D::External>,
935 env: &mut D::InstantiateEnv,
936 ) where
937 D: Deploy<'a>,
938 {
939 let refcell_extra_stmts = RefCell::new(extra_stmts);
940 let refcell_env = RefCell::new(env);
941 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
942 self.transform_bottom_up(
943 &mut |l| {
944 if let HydroRoot::SendExternal {
945 input,
946 to_external_key,
947 to_port_id,
948 to_many,
949 unpaired,
950 instantiate_fn,
951 ..
952 } = l
953 {
954 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
955 DebugInstantiate::Building => {
956 let to_node = externals
957 .get(*to_external_key)
958 .unwrap_or_else(|| {
959 panic!("A external used in the graph was not instantiated: {}", to_external_key)
960 })
961 .clone();
962
963 match input.metadata().location_id.root() {
964 &LocationId::Process(process_key) => {
965 if *to_many {
966 (
967 (
968 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
969 parse_quote!(DUMMY),
970 ),
971 Box::new(|| {}) as Box<dyn FnOnce()>,
972 )
973 } else {
974 let from_node = processes
975 .get(process_key)
976 .unwrap_or_else(|| {
977 panic!("A process used in the graph was not instantiated: {}", process_key)
978 })
979 .clone();
980
981 let sink_port = from_node.next_port();
982 let source_port = to_node.next_port();
983
984 if *unpaired {
985 use stageleft::quote_type;
986 use tokio_util::codec::LengthDelimitedCodec;
987
988 to_node.register(*to_port_id, source_port.clone());
989
990 let _ = D::e2o_source(
991 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
992 &to_node, &source_port,
993 &from_node, &sink_port,
994 "e_type::<LengthDelimitedCodec>(),
995 format!("{}_{}", *to_external_key, *to_port_id)
996 );
997 }
998
999 (
1000 (
1001 D::o2e_sink(
1002 &from_node,
1003 &sink_port,
1004 &to_node,
1005 &source_port,
1006 format!("{}_{}", *to_external_key, *to_port_id)
1007 ),
1008 parse_quote!(DUMMY),
1009 ),
1010 if *unpaired {
1011 D::e2o_connect(
1012 &to_node,
1013 &source_port,
1014 &from_node,
1015 &sink_port,
1016 *to_many,
1017 NetworkHint::Auto,
1018 )
1019 } else {
1020 Box::new(|| {}) as Box<dyn FnOnce()>
1021 },
1022 )
1023 }
1024 }
1025 LocationId::Cluster(cluster_key) => {
1026 let from_node = clusters
1027 .get(*cluster_key)
1028 .unwrap_or_else(|| {
1029 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1030 })
1031 .clone();
1032
1033 let sink_port = from_node.next_port();
1034 let source_port = to_node.next_port();
1035
1036 if *unpaired {
1037 to_node.register(*to_port_id, source_port.clone());
1038 }
1039
1040 (
1041 (
1042 D::m2e_sink(
1043 &from_node,
1044 &sink_port,
1045 &to_node,
1046 &source_port,
1047 format!("{}_{}", *to_external_key, *to_port_id)
1048 ),
1049 parse_quote!(DUMMY),
1050 ),
1051 Box::new(|| {}) as Box<dyn FnOnce()>,
1052 )
1053 }
1054 _ => panic!()
1055 }
1056 },
1057
1058 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1059 };
1060
1061 *instantiate_fn = DebugInstantiateFinalized {
1062 sink: sink_expr,
1063 source: source_expr,
1064 connect_fn: Some(connect_fn),
1065 }
1066 .into();
1067 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1068 let element_type = match &input.metadata().collection_kind {
1069 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1070 _ => panic!("Embedded output must have Stream collection kind"),
1071 };
1072 let location_key = match input.metadata().location_id.root() {
1073 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1074 _ => panic!("Embedded output must be on a process or cluster"),
1075 };
1076 D::register_embedded_output(
1077 &mut refcell_env.borrow_mut(),
1078 location_key,
1079 ident,
1080 &element_type,
1081 );
1082 }
1083 },
1084 &mut |n| {
1085 if let HydroNode::Network {
1086 name,
1087 networking_info,
1088 input,
1089 instantiate_fn,
1090 metadata,
1091 ..
1092 } = n
1093 {
1094 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1095 DebugInstantiate::Building => instantiate_network::<D>(
1096 &mut refcell_env.borrow_mut(),
1097 input.metadata().location_id.root(),
1098 metadata.location_id.root(),
1099 processes,
1100 clusters,
1101 name.as_deref(),
1102 networking_info,
1103 ),
1104
1105 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1106 };
1107
1108 *instantiate_fn = DebugInstantiateFinalized {
1109 sink: sink_expr,
1110 source: source_expr,
1111 connect_fn: Some(connect_fn),
1112 }
1113 .into();
1114 } else if let HydroNode::ExternalInput {
1115 from_external_key,
1116 from_port_id,
1117 from_many,
1118 codec_type,
1119 port_hint,
1120 instantiate_fn,
1121 metadata,
1122 ..
1123 } = n
1124 {
1125 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1126 DebugInstantiate::Building => {
1127 let from_node = externals
1128 .get(*from_external_key)
1129 .unwrap_or_else(|| {
1130 panic!(
1131 "A external used in the graph was not instantiated: {}",
1132 from_external_key,
1133 )
1134 })
1135 .clone();
1136
1137 match metadata.location_id.root() {
1138 &LocationId::Process(process_key) => {
1139 let to_node = processes
1140 .get(process_key)
1141 .unwrap_or_else(|| {
1142 panic!("A process used in the graph was not instantiated: {}", process_key)
1143 })
1144 .clone();
1145
1146 let sink_port = from_node.next_port();
1147 let source_port = to_node.next_port();
1148
1149 from_node.register(*from_port_id, sink_port.clone());
1150
1151 (
1152 (
1153 parse_quote!(DUMMY),
1154 if *from_many {
1155 D::e2o_many_source(
1156 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1157 &to_node, &source_port,
1158 codec_type.0.as_ref(),
1159 format!("{}_{}", *from_external_key, *from_port_id)
1160 )
1161 } else {
1162 D::e2o_source(
1163 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1164 &from_node, &sink_port,
1165 &to_node, &source_port,
1166 codec_type.0.as_ref(),
1167 format!("{}_{}", *from_external_key, *from_port_id)
1168 )
1169 },
1170 ),
1171 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1172 )
1173 }
1174 LocationId::Cluster(cluster_key) => {
1175 let to_node = clusters
1176 .get(*cluster_key)
1177 .unwrap_or_else(|| {
1178 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1179 })
1180 .clone();
1181
1182 let sink_port = from_node.next_port();
1183 let source_port = to_node.next_port();
1184
1185 from_node.register(*from_port_id, sink_port.clone());
1186
1187 (
1188 (
1189 parse_quote!(DUMMY),
1190 D::e2m_source(
1191 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1192 &from_node, &sink_port,
1193 &to_node, &source_port,
1194 codec_type.0.as_ref(),
1195 format!("{}_{}", *from_external_key, *from_port_id)
1196 ),
1197 ),
1198 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1199 )
1200 }
1201 _ => panic!()
1202 }
1203 },
1204
1205 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1206 };
1207
1208 *instantiate_fn = DebugInstantiateFinalized {
1209 sink: sink_expr,
1210 source: source_expr,
1211 connect_fn: Some(connect_fn),
1212 }
1213 .into();
1214 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1215 let element_type = match &metadata.collection_kind {
1216 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1217 _ => panic!("Embedded source must have Stream collection kind"),
1218 };
1219 let location_key = match metadata.location_id.root() {
1220 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1221 _ => panic!("Embedded source must be on a process or cluster"),
1222 };
1223 D::register_embedded_stream_input(
1224 &mut refcell_env.borrow_mut(),
1225 location_key,
1226 ident,
1227 &element_type,
1228 );
1229 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1230 let element_type = match &metadata.collection_kind {
1231 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1232 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1233 };
1234 let location_key = match metadata.location_id.root() {
1235 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1236 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1237 };
1238 D::register_embedded_singleton_input(
1239 &mut refcell_env.borrow_mut(),
1240 location_key,
1241 ident,
1242 &element_type,
1243 );
1244 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1245 match state {
1246 ClusterMembersState::Uninit => {
1247 let at_location = metadata.location_id.root().clone();
1248 let key = (at_location.clone(), location_id.key());
1249 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1250 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1252 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1253 &(),
1254 );
1255 *state = ClusterMembersState::Stream(expr.into());
1256 } else {
1257 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1259 }
1260 }
1261 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1262 panic!("cluster members already finalized");
1263 }
1264 }
1265 }
1266 },
1267 seen_tees,
1268 false,
1269 );
1270 }
1271
1272 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1273 self.transform_bottom_up(
1274 &mut |l| {
1275 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1276 match instantiate_fn {
1277 DebugInstantiate::Building => panic!("network not built"),
1278
1279 DebugInstantiate::Finalized(finalized) => {
1280 (finalized.connect_fn.take().unwrap())();
1281 }
1282 }
1283 }
1284 },
1285 &mut |n| {
1286 if let HydroNode::Network { instantiate_fn, .. }
1287 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1288 {
1289 match instantiate_fn {
1290 DebugInstantiate::Building => panic!("network not built"),
1291
1292 DebugInstantiate::Finalized(finalized) => {
1293 (finalized.connect_fn.take().unwrap())();
1294 }
1295 }
1296 }
1297 },
1298 seen_tees,
1299 false,
1300 );
1301 }
1302
1303 pub fn transform_bottom_up(
1304 &mut self,
1305 transform_root: &mut impl FnMut(&mut HydroRoot),
1306 transform_node: &mut impl FnMut(&mut HydroNode),
1307 seen_tees: &mut SeenSharedNodes,
1308 check_well_formed: bool,
1309 ) {
1310 self.transform_children(
1311 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1312 seen_tees,
1313 );
1314
1315 transform_root(self);
1316 }
1317
1318 pub fn transform_children(
1319 &mut self,
1320 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1321 seen_tees: &mut SeenSharedNodes,
1322 ) {
1323 match self {
1324 HydroRoot::ForEach { f, input, .. } => {
1325 f.transform_children(&mut transform, seen_tees);
1326 transform(input, seen_tees);
1327 }
1328 HydroRoot::SendExternal { input, .. }
1329 | HydroRoot::DestSink { input, .. }
1330 | HydroRoot::CycleSink { input, .. }
1331 | HydroRoot::EmbeddedOutput { input, .. }
1332 | HydroRoot::Null { input, .. } => {
1333 transform(input, seen_tees);
1334 }
1335 }
1336 }
1337
1338 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1339 match self {
1340 HydroRoot::ForEach {
1341 f,
1342 input,
1343 op_metadata,
1344 } => HydroRoot::ForEach {
1345 f: f.deep_clone(seen_tees),
1346 input: Box::new(input.deep_clone(seen_tees)),
1347 op_metadata: op_metadata.clone(),
1348 },
1349 HydroRoot::SendExternal {
1350 to_external_key,
1351 to_port_id,
1352 to_many,
1353 unpaired,
1354 serialize_fn,
1355 instantiate_fn,
1356 input,
1357 op_metadata,
1358 } => HydroRoot::SendExternal {
1359 to_external_key: *to_external_key,
1360 to_port_id: *to_port_id,
1361 to_many: *to_many,
1362 unpaired: *unpaired,
1363 serialize_fn: serialize_fn.clone(),
1364 instantiate_fn: instantiate_fn.clone(),
1365 input: Box::new(input.deep_clone(seen_tees)),
1366 op_metadata: op_metadata.clone(),
1367 },
1368 HydroRoot::DestSink {
1369 sink,
1370 input,
1371 op_metadata,
1372 } => HydroRoot::DestSink {
1373 sink: sink.clone(),
1374 input: Box::new(input.deep_clone(seen_tees)),
1375 op_metadata: op_metadata.clone(),
1376 },
1377 HydroRoot::CycleSink {
1378 cycle_id,
1379 input,
1380 op_metadata,
1381 } => HydroRoot::CycleSink {
1382 cycle_id: *cycle_id,
1383 input: Box::new(input.deep_clone(seen_tees)),
1384 op_metadata: op_metadata.clone(),
1385 },
1386 HydroRoot::EmbeddedOutput {
1387 ident,
1388 input,
1389 op_metadata,
1390 } => HydroRoot::EmbeddedOutput {
1391 ident: ident.clone(),
1392 input: Box::new(input.deep_clone(seen_tees)),
1393 op_metadata: op_metadata.clone(),
1394 },
1395 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1396 input: Box::new(input.deep_clone(seen_tees)),
1397 op_metadata: op_metadata.clone(),
1398 },
1399 }
1400 }
1401
1402 #[cfg(feature = "build")]
1403 pub fn emit(
1404 &mut self,
1405 graph_builders: &mut dyn DfirBuilder,
1406 seen_tees: &mut SeenSharedNodes,
1407 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1408 next_stmt_id: &mut crate::Counter<StmtId>,
1409 fold_hooked_idents: &mut HashSet<String>,
1410 ) {
1411 self.emit_core(
1412 &mut BuildersOrCallback::<
1413 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1414 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1415 >::Builders(graph_builders),
1416 seen_tees,
1417 built_tees,
1418 next_stmt_id,
1419 fold_hooked_idents,
1420 );
1421 }
1422
1423 #[cfg(feature = "build")]
1424 pub fn emit_core(
1425 &mut self,
1426 builders_or_callback: &mut BuildersOrCallback<
1427 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1428 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1429 >,
1430 seen_tees: &mut SeenSharedNodes,
1431 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1432 next_stmt_id: &mut crate::Counter<StmtId>,
1433 fold_hooked_idents: &mut HashSet<String>,
1434 ) {
1435 match self {
1436 HydroRoot::ForEach { f, input, .. } => {
1437 let input_ident = input.emit_core(
1438 builders_or_callback,
1439 seen_tees,
1440 built_tees,
1441 next_stmt_id,
1442 fold_hooked_idents,
1443 );
1444
1445 let stmt_id = next_stmt_id.get_and_increment();
1446
1447 match builders_or_callback {
1448 BuildersOrCallback::Builders(graph_builders) => {
1449 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1450
1451 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1453 let HydroNode::Reference { inner, .. } = ref_node else {
1454 panic!("singleton_refs should only contain HydroNode::Reference");
1455 };
1456 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1457 let idents = built_tees.get(&ptr).expect(
1458 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1459 );
1460 ident_stack.push(idents[0].clone());
1461 }
1462
1463 let f_tokens = f.emit_tokens(&mut ident_stack);
1464
1465 graph_builders
1466 .get_dfir_mut(&input.metadata().location_id)
1467 .add_dfir(
1468 parse_quote! {
1469 #input_ident -> for_each(#f_tokens);
1470 },
1471 None,
1472 Some(&stmt_id.to_string()),
1473 );
1474 }
1475 BuildersOrCallback::Callback(leaf_callback, _) => {
1476 leaf_callback(self, next_stmt_id);
1477 }
1478 }
1479 }
1480
1481 HydroRoot::SendExternal {
1482 serialize_fn,
1483 instantiate_fn,
1484 input,
1485 ..
1486 } => {
1487 let input_ident = input.emit_core(
1488 builders_or_callback,
1489 seen_tees,
1490 built_tees,
1491 next_stmt_id,
1492 fold_hooked_idents,
1493 );
1494
1495 let stmt_id = next_stmt_id.get_and_increment();
1496
1497 match builders_or_callback {
1498 BuildersOrCallback::Builders(graph_builders) => {
1499 let (sink_expr, _) = match instantiate_fn {
1500 DebugInstantiate::Building => (
1501 syn::parse_quote!(DUMMY_SINK),
1502 syn::parse_quote!(DUMMY_SOURCE),
1503 ),
1504
1505 DebugInstantiate::Finalized(finalized) => {
1506 (finalized.sink.clone(), finalized.source.clone())
1507 }
1508 };
1509
1510 graph_builders.create_external_output(
1511 &input.metadata().location_id,
1512 sink_expr,
1513 &input_ident,
1514 serialize_fn.as_ref(),
1515 stmt_id,
1516 );
1517 }
1518 BuildersOrCallback::Callback(leaf_callback, _) => {
1519 leaf_callback(self, next_stmt_id);
1520 }
1521 }
1522 }
1523
1524 HydroRoot::DestSink { sink, input, .. } => {
1525 let input_ident = input.emit_core(
1526 builders_or_callback,
1527 seen_tees,
1528 built_tees,
1529 next_stmt_id,
1530 fold_hooked_idents,
1531 );
1532
1533 let stmt_id = next_stmt_id.get_and_increment();
1534
1535 match builders_or_callback {
1536 BuildersOrCallback::Builders(graph_builders) => {
1537 graph_builders
1538 .get_dfir_mut(&input.metadata().location_id)
1539 .add_dfir(
1540 parse_quote! {
1541 #input_ident -> dest_sink(#sink);
1542 },
1543 None,
1544 Some(&stmt_id.to_string()),
1545 );
1546 }
1547 BuildersOrCallback::Callback(leaf_callback, _) => {
1548 leaf_callback(self, next_stmt_id);
1549 }
1550 }
1551 }
1552
1553 HydroRoot::CycleSink {
1554 cycle_id, input, ..
1555 } => {
1556 let input_ident = input.emit_core(
1557 builders_or_callback,
1558 seen_tees,
1559 built_tees,
1560 next_stmt_id,
1561 fold_hooked_idents,
1562 );
1563
1564 match builders_or_callback {
1565 BuildersOrCallback::Builders(graph_builders) => {
1566 let elem_type: syn::Type = match &input.metadata().collection_kind {
1567 CollectionKind::KeyedSingleton {
1568 key_type,
1569 value_type,
1570 ..
1571 }
1572 | CollectionKind::KeyedStream {
1573 key_type,
1574 value_type,
1575 ..
1576 } => {
1577 parse_quote!((#key_type, #value_type))
1578 }
1579 CollectionKind::Stream { element_type, .. }
1580 | CollectionKind::Singleton { element_type, .. }
1581 | CollectionKind::Optional { element_type, .. } => {
1582 parse_quote!(#element_type)
1583 }
1584 };
1585
1586 let cycle_id_ident = cycle_id.as_ident();
1587 graph_builders
1588 .get_dfir_mut(&input.metadata().location_id)
1589 .add_dfir(
1590 parse_quote! {
1591 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1592 },
1593 None,
1594 None,
1595 );
1596 }
1597 BuildersOrCallback::Callback(_, _) => {}
1599 }
1600 }
1601
1602 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1603 let input_ident = input.emit_core(
1604 builders_or_callback,
1605 seen_tees,
1606 built_tees,
1607 next_stmt_id,
1608 fold_hooked_idents,
1609 );
1610
1611 let stmt_id = next_stmt_id.get_and_increment();
1612
1613 match builders_or_callback {
1614 BuildersOrCallback::Builders(graph_builders) => {
1615 graph_builders
1616 .get_dfir_mut(&input.metadata().location_id)
1617 .add_dfir(
1618 parse_quote! {
1619 #input_ident -> for_each(&mut #ident);
1620 },
1621 None,
1622 Some(&stmt_id.to_string()),
1623 );
1624 }
1625 BuildersOrCallback::Callback(leaf_callback, _) => {
1626 leaf_callback(self, next_stmt_id);
1627 }
1628 }
1629 }
1630
1631 HydroRoot::Null { input, .. } => {
1632 let input_ident = input.emit_core(
1633 builders_or_callback,
1634 seen_tees,
1635 built_tees,
1636 next_stmt_id,
1637 fold_hooked_idents,
1638 );
1639
1640 let stmt_id = next_stmt_id.get_and_increment();
1641
1642 match builders_or_callback {
1643 BuildersOrCallback::Builders(graph_builders) => {
1644 graph_builders
1645 .get_dfir_mut(&input.metadata().location_id)
1646 .add_dfir(
1647 parse_quote! {
1648 #input_ident -> for_each(|_| {});
1649 },
1650 None,
1651 Some(&stmt_id.to_string()),
1652 );
1653 }
1654 BuildersOrCallback::Callback(leaf_callback, _) => {
1655 leaf_callback(self, next_stmt_id);
1656 }
1657 }
1658 }
1659 }
1660 }
1661
1662 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1663 match self {
1664 HydroRoot::ForEach { op_metadata, .. }
1665 | HydroRoot::SendExternal { op_metadata, .. }
1666 | HydroRoot::DestSink { op_metadata, .. }
1667 | HydroRoot::CycleSink { op_metadata, .. }
1668 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1669 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1670 }
1671 }
1672
1673 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1674 match self {
1675 HydroRoot::ForEach { op_metadata, .. }
1676 | HydroRoot::SendExternal { op_metadata, .. }
1677 | HydroRoot::DestSink { op_metadata, .. }
1678 | HydroRoot::CycleSink { op_metadata, .. }
1679 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1680 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1681 }
1682 }
1683
1684 pub fn input(&self) -> &HydroNode {
1685 match self {
1686 HydroRoot::ForEach { input, .. }
1687 | HydroRoot::SendExternal { input, .. }
1688 | HydroRoot::DestSink { input, .. }
1689 | HydroRoot::CycleSink { input, .. }
1690 | HydroRoot::EmbeddedOutput { input, .. }
1691 | HydroRoot::Null { input, .. } => input,
1692 }
1693 }
1694
1695 pub fn input_metadata(&self) -> &HydroIrMetadata {
1696 self.input().metadata()
1697 }
1698
1699 pub fn print_root(&self) -> String {
1700 match self {
1701 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1702 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1703 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1704 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1705 HydroRoot::EmbeddedOutput { ident, .. } => {
1706 format!("EmbeddedOutput({})", ident)
1707 }
1708 HydroRoot::Null { .. } => "Null".to_owned(),
1709 }
1710 }
1711
1712 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1713 match self {
1714 HydroRoot::ForEach { f, .. } => {
1715 transform(&mut f.expr);
1716 }
1717 HydroRoot::DestSink { sink, .. } => {
1718 transform(sink);
1719 }
1720 HydroRoot::SendExternal { .. }
1721 | HydroRoot::CycleSink { .. }
1722 | HydroRoot::EmbeddedOutput { .. }
1723 | HydroRoot::Null { .. } => {}
1724 }
1725 }
1726}
1727
1728#[cfg(feature = "build")]
1729fn tick_of(loc: &LocationId) -> Option<ClockId> {
1730 match loc {
1731 LocationId::Tick(id, _) => Some(*id),
1732 LocationId::Atomic(inner) => tick_of(inner),
1733 _ => None,
1734 }
1735}
1736
1737#[cfg(feature = "build")]
1738fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1739 match loc {
1740 LocationId::Tick(id, inner) => {
1741 *id = uf_find(uf, *id);
1742 remap_location(inner, uf);
1743 }
1744 LocationId::Atomic(inner) => {
1745 remap_location(inner, uf);
1746 }
1747 LocationId::Process(_) | LocationId::Cluster(_) => {}
1748 }
1749}
1750
1751#[cfg(feature = "build")]
1752fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1753 let p = *parent.get(&x).unwrap_or(&x);
1754 if p == x {
1755 return x;
1756 }
1757 let root = uf_find(parent, p);
1758 parent.insert(x, root);
1759 root
1760}
1761
1762#[cfg(feature = "build")]
1763fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1764 let ra = uf_find(parent, a);
1765 let rb = uf_find(parent, b);
1766 if ra != rb {
1767 parent.insert(ra, rb);
1768 }
1769}
1770
1771#[cfg(feature = "build")]
1775pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1776 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1777
1778 transform_bottom_up(
1780 ir,
1781 &mut |_| {},
1782 &mut |node: &mut HydroNode| match node {
1783 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1784 if let (Some(a), Some(b)) = (
1785 tick_of(&inner.metadata().location_id),
1786 tick_of(&metadata.location_id),
1787 ) {
1788 uf_union(&mut uf, a, b);
1789 }
1790 }
1791 HydroNode::Chain {
1792 first,
1793 second,
1794 metadata,
1795 }
1796 | HydroNode::ChainFirst {
1797 first,
1798 second,
1799 metadata,
1800 }
1801 | HydroNode::MergeOrdered {
1802 first,
1803 second,
1804 metadata,
1805 } => {
1806 if let (Some(a), Some(b)) = (
1807 tick_of(&first.metadata().location_id),
1808 tick_of(&metadata.location_id),
1809 ) {
1810 uf_union(&mut uf, a, b);
1811 }
1812 if let (Some(a), Some(b)) = (
1813 tick_of(&second.metadata().location_id),
1814 tick_of(&metadata.location_id),
1815 ) {
1816 uf_union(&mut uf, a, b);
1817 }
1818 }
1819 _ => {}
1820 },
1821 false,
1822 );
1823
1824 transform_bottom_up(
1826 ir,
1827 &mut |_| {},
1828 &mut |node: &mut HydroNode| {
1829 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1830 },
1831 false,
1832 );
1833}
1834
1835#[cfg(feature = "build")]
1836pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1837 let mut builders = SecondaryMap::new();
1838 let mut seen_tees = HashMap::new();
1839 let mut built_tees = HashMap::new();
1840 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1841 let mut fold_hooked_idents = HashSet::new();
1842 for leaf in ir {
1843 leaf.emit(
1844 &mut builders,
1845 &mut seen_tees,
1846 &mut built_tees,
1847 &mut next_stmt_id,
1848 &mut fold_hooked_idents,
1849 );
1850 }
1851 builders
1852}
1853
1854#[cfg(feature = "build")]
1855pub fn traverse_dfir(
1856 ir: &mut [HydroRoot],
1857 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1858 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1859) {
1860 let mut seen_tees = HashMap::new();
1861 let mut built_tees = HashMap::new();
1862 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1863 let mut fold_hooked_idents = HashSet::new();
1864 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1865 ir.iter_mut().for_each(|leaf| {
1866 leaf.emit_core(
1867 &mut callback,
1868 &mut seen_tees,
1869 &mut built_tees,
1870 &mut next_stmt_id,
1871 &mut fold_hooked_idents,
1872 );
1873 });
1874}
1875
1876pub fn transform_bottom_up(
1877 ir: &mut [HydroRoot],
1878 transform_root: &mut impl FnMut(&mut HydroRoot),
1879 transform_node: &mut impl FnMut(&mut HydroNode),
1880 check_well_formed: bool,
1881) {
1882 let mut seen_tees = HashMap::new();
1883 ir.iter_mut().for_each(|leaf| {
1884 leaf.transform_bottom_up(
1885 transform_root,
1886 transform_node,
1887 &mut seen_tees,
1888 check_well_formed,
1889 );
1890 });
1891}
1892
1893pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1894 let mut seen_tees = HashMap::new();
1895 ir.iter()
1896 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1897 .collect()
1898}
1899
1900type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1901thread_local! {
1902 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1903 static SERIALIZED_SHARED: PrintedTees
1907 = const { RefCell::new(None) };
1908}
1909
1910pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1911 PRINTED_TEES.with(|printed_tees| {
1912 let mut printed_tees_mut = printed_tees.borrow_mut();
1913 *printed_tees_mut = Some((0, HashMap::new()));
1914 drop(printed_tees_mut);
1915
1916 let ret = f();
1917
1918 let mut printed_tees_mut = printed_tees.borrow_mut();
1919 *printed_tees_mut = None;
1920
1921 ret
1922 })
1923}
1924
1925pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1930 let _guard = SerializedSharedGuard::enter();
1931 f()
1932}
1933
1934struct SerializedSharedGuard {
1937 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1938}
1939
1940impl SerializedSharedGuard {
1941 fn enter() -> Self {
1942 let previous = SERIALIZED_SHARED.with(|cell| {
1943 let mut guard = cell.borrow_mut();
1944 guard.replace((0, HashMap::new()))
1945 });
1946 Self { previous }
1947 }
1948}
1949
1950impl Drop for SerializedSharedGuard {
1951 fn drop(&mut self) {
1952 SERIALIZED_SHARED.with(|cell| {
1953 *cell.borrow_mut() = self.previous.take();
1954 });
1955 }
1956}
1957
1958pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1959
1960impl serde::Serialize for SharedNode {
1961 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1972 SERIALIZED_SHARED.with(|cell| {
1973 let mut guard = cell.borrow_mut();
1974 let state = guard.as_mut().ok_or_else(|| {
1976 serde::ser::Error::custom(
1977 "SharedNode serialization requires an active serialize_dedup_shared scope",
1978 )
1979 })?;
1980 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1981
1982 if let Some(&id) = state.1.get(&ptr) {
1983 drop(guard);
1984 use serde::ser::SerializeMap;
1985 let mut map = serializer.serialize_map(Some(1))?;
1986 map.serialize_entry("$shared_ref", &id)?;
1987 map.end()
1988 } else {
1989 let id = state.0;
1990 state.0 += 1;
1991 state.1.insert(ptr, id);
1992 drop(guard);
1993
1994 use serde::ser::SerializeMap;
1995 let mut map = serializer.serialize_map(Some(2))?;
1996 map.serialize_entry("$shared", &id)?;
1997 map.serialize_entry("node", &*self.0.borrow())?;
1998 map.end()
1999 }
2000 })
2001 }
2002}
2003
2004impl SharedNode {
2005 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2006 Rc::as_ptr(&self.0)
2007 }
2008}
2009
2010impl Debug for SharedNode {
2011 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2012 PRINTED_TEES.with(|printed_tees| {
2013 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2014 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2015
2016 if let Some(printed_tees_mut) = printed_tees_mut {
2017 if let Some(existing) = printed_tees_mut
2018 .1
2019 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2020 {
2021 write!(f, "<shared {}>", existing)
2022 } else {
2023 let next_id = printed_tees_mut.0;
2024 printed_tees_mut.0 += 1;
2025 printed_tees_mut
2026 .1
2027 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2028 drop(printed_tees_mut_borrow);
2029 write!(f, "<shared {}>: ", next_id)?;
2030 Debug::fmt(&self.0.borrow(), f)
2031 }
2032 } else {
2033 drop(printed_tees_mut_borrow);
2034 write!(f, "<shared>: ")?;
2035 Debug::fmt(&self.0.borrow(), f)
2036 }
2037 })
2038 }
2039}
2040
2041impl Hash for SharedNode {
2042 fn hash<H: Hasher>(&self, state: &mut H) {
2043 self.0.borrow_mut().hash(state);
2044 }
2045}
2046
2047#[derive(Debug)]
2052pub enum AccessCounter {
2053 Counting(Cell<u32>),
2054 Frozen(u32),
2055}
2056
2057impl AccessCounter {
2058 pub fn new() -> Self {
2059 Self::Counting(Cell::new(0))
2060 }
2061
2062 pub fn next_group(&self, is_mut: bool) -> Self {
2066 let AccessCounter::Counting(count) = self else {
2067 panic!("Cannot count on `AccessCounter::Frozen`");
2068 };
2069 let c = if is_mut {
2070 let c = count.get() + 1;
2071 count.set(c + 1);
2072 c
2073 } else {
2074 count.get()
2075 };
2076 Self::Frozen(c)
2077 }
2078
2079 pub fn freeze(&self) -> Self {
2081 Self::Frozen(match self {
2082 Self::Counting(count) => count.get(),
2083 Self::Frozen(count) => *count,
2084 })
2085 }
2086
2087 pub fn frozen_group(&self) -> u32 {
2088 let Self::Frozen(count) = self else {
2089 panic!("`AccessCounter` not frozen");
2090 };
2091 *count
2092 }
2093}
2094
2095impl Default for AccessCounter {
2096 fn default() -> Self {
2097 Self::new()
2098 }
2099}
2100
2101impl Hash for AccessCounter {
2102 fn hash<H: Hasher>(&self, _state: &mut H) {
2103 }
2105}
2106
2107impl serde::Serialize for AccessCounter {
2108 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2109 let count = match self {
2110 AccessCounter::Counting(count) => count.get(),
2111 AccessCounter::Frozen(count) => *count,
2112 };
2113 count.serialize(serializer)
2114 }
2115}
2116
2117#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2118pub enum BoundKind {
2119 Unbounded,
2120 Bounded,
2121}
2122
2123#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2124pub enum StreamOrder {
2125 NoOrder,
2126 TotalOrder,
2127}
2128
2129#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2130pub enum StreamRetry {
2131 AtLeastOnce,
2132 ExactlyOnce,
2133}
2134
2135#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2136pub enum KeyedSingletonBoundKind {
2137 Unbounded,
2138 MonotonicKeys,
2139 MonotonicValue,
2140 BoundedValue,
2141 Bounded,
2142}
2143
2144#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2145pub enum SingletonBoundKind {
2146 Unbounded,
2147 Monotonic,
2148 Bounded,
2149}
2150
2151#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2152pub enum CollectionKind {
2153 Stream {
2154 bound: BoundKind,
2155 order: StreamOrder,
2156 retry: StreamRetry,
2157 element_type: DebugType,
2158 },
2159 Singleton {
2160 bound: SingletonBoundKind,
2161 element_type: DebugType,
2162 },
2163 Optional {
2164 bound: BoundKind,
2165 element_type: DebugType,
2166 },
2167 KeyedStream {
2168 bound: BoundKind,
2169 value_order: StreamOrder,
2170 value_retry: StreamRetry,
2171 key_type: DebugType,
2172 value_type: DebugType,
2173 },
2174 KeyedSingleton {
2175 bound: KeyedSingletonBoundKind,
2176 key_type: DebugType,
2177 value_type: DebugType,
2178 },
2179}
2180
2181impl CollectionKind {
2182 pub fn is_bounded(&self) -> bool {
2183 matches!(
2184 self,
2185 CollectionKind::Stream {
2186 bound: BoundKind::Bounded,
2187 ..
2188 } | CollectionKind::Singleton {
2189 bound: SingletonBoundKind::Bounded,
2190 ..
2191 } | CollectionKind::Optional {
2192 bound: BoundKind::Bounded,
2193 ..
2194 } | CollectionKind::KeyedStream {
2195 bound: BoundKind::Bounded,
2196 ..
2197 } | CollectionKind::KeyedSingleton {
2198 bound: KeyedSingletonBoundKind::Bounded,
2199 ..
2200 }
2201 )
2202 }
2203}
2204
2205#[derive(Clone, serde::Serialize)]
2206pub struct HydroIrMetadata {
2207 pub location_id: LocationId,
2208 pub collection_kind: CollectionKind,
2209 pub consistency: Option<ClusterConsistency>,
2210 pub cardinality: Option<usize>,
2211 pub tag: Option<String>,
2212 pub op: HydroIrOpMetadata,
2213}
2214
2215impl Hash for HydroIrMetadata {
2217 fn hash<H: Hasher>(&self, _: &mut H) {}
2218}
2219
2220impl PartialEq for HydroIrMetadata {
2221 fn eq(&self, _: &Self) -> bool {
2222 true
2223 }
2224}
2225
2226impl Eq for HydroIrMetadata {}
2227
2228impl Debug for HydroIrMetadata {
2229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2230 f.debug_struct("HydroIrMetadata")
2231 .field("location_id", &self.location_id)
2232 .field("collection_kind", &self.collection_kind)
2233 .finish()
2234 }
2235}
2236
2237#[derive(Clone, serde::Serialize)]
2240pub struct HydroIrOpMetadata {
2241 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2242 pub backtrace: Backtrace,
2243 pub cpu_usage: Option<f64>,
2244 pub network_recv_cpu_usage: Option<f64>,
2245 pub id: Option<usize>,
2246}
2247
2248impl HydroIrOpMetadata {
2249 #[expect(
2250 clippy::new_without_default,
2251 reason = "explicit calls to new ensure correct backtrace bounds"
2252 )]
2253 pub fn new() -> HydroIrOpMetadata {
2254 Self::new_with_skip(1)
2255 }
2256
2257 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2258 HydroIrOpMetadata {
2259 backtrace: Backtrace::get_backtrace(2 + skip_count),
2260 cpu_usage: None,
2261 network_recv_cpu_usage: None,
2262 id: None,
2263 }
2264 }
2265}
2266
2267impl Debug for HydroIrOpMetadata {
2268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2269 f.debug_struct("HydroIrOpMetadata").finish()
2270 }
2271}
2272
2273impl Hash for HydroIrOpMetadata {
2274 fn hash<H: Hasher>(&self, _: &mut H) {}
2275}
2276
2277#[derive(Debug, Hash, serde::Serialize)]
2280pub enum HydroNode {
2281 Placeholder,
2282
2283 Cast {
2291 inner: Box<HydroNode>,
2292 metadata: HydroIrMetadata,
2293 },
2294
2295 ObserveNonDet {
2301 inner: Box<HydroNode>,
2302 trusted: bool, metadata: HydroIrMetadata,
2304 },
2305
2306 Source {
2307 source: HydroSource,
2308 metadata: HydroIrMetadata,
2309 },
2310
2311 SingletonSource {
2312 value: DebugExpr,
2313 first_tick_only: bool,
2314 metadata: HydroIrMetadata,
2315 },
2316
2317 CycleSource {
2318 cycle_id: CycleId,
2319 metadata: HydroIrMetadata,
2320 },
2321
2322 Tee {
2323 inner: SharedNode,
2324 metadata: HydroIrMetadata,
2325 },
2326
2327 Reference {
2336 inner: SharedNode,
2337 kind: crate::handoff_ref::HandoffRefKind,
2338 access_counter: AccessCounter,
2339 metadata: HydroIrMetadata,
2340 },
2341
2342 Partition {
2343 inner: SharedNode,
2344 f: ClosureExpr,
2345 is_true: bool,
2346 metadata: HydroIrMetadata,
2347 },
2348
2349 BeginAtomic {
2350 inner: Box<HydroNode>,
2351 metadata: HydroIrMetadata,
2352 },
2353
2354 EndAtomic {
2355 inner: Box<HydroNode>,
2356 metadata: HydroIrMetadata,
2357 },
2358
2359 Batch {
2360 inner: Box<HydroNode>,
2361 metadata: HydroIrMetadata,
2362 },
2363
2364 YieldConcat {
2365 inner: Box<HydroNode>,
2366 metadata: HydroIrMetadata,
2367 },
2368
2369 Chain {
2370 first: Box<HydroNode>,
2371 second: Box<HydroNode>,
2372 metadata: HydroIrMetadata,
2373 },
2374
2375 MergeOrdered {
2376 first: Box<HydroNode>,
2377 second: Box<HydroNode>,
2378 metadata: HydroIrMetadata,
2379 },
2380
2381 ChainFirst {
2382 first: Box<HydroNode>,
2383 second: Box<HydroNode>,
2384 metadata: HydroIrMetadata,
2385 },
2386
2387 CrossProduct {
2388 left: Box<HydroNode>,
2389 right: Box<HydroNode>,
2390 metadata: HydroIrMetadata,
2391 },
2392
2393 CrossSingleton {
2394 left: Box<HydroNode>,
2395 right: Box<HydroNode>,
2396 metadata: HydroIrMetadata,
2397 },
2398
2399 Join {
2400 left: Box<HydroNode>,
2401 right: Box<HydroNode>,
2402 metadata: HydroIrMetadata,
2403 },
2404
2405 JoinHalf {
2409 left: Box<HydroNode>,
2410 right: Box<HydroNode>,
2411 metadata: HydroIrMetadata,
2412 },
2413
2414 Difference {
2415 pos: Box<HydroNode>,
2416 neg: Box<HydroNode>,
2417 metadata: HydroIrMetadata,
2418 },
2419
2420 AntiJoin {
2421 pos: Box<HydroNode>,
2422 neg: Box<HydroNode>,
2423 metadata: HydroIrMetadata,
2424 },
2425
2426 ResolveFutures {
2427 input: Box<HydroNode>,
2428 metadata: HydroIrMetadata,
2429 },
2430 ResolveFuturesBlocking {
2431 input: Box<HydroNode>,
2432 metadata: HydroIrMetadata,
2433 },
2434 ResolveFuturesOrdered {
2435 input: Box<HydroNode>,
2436 metadata: HydroIrMetadata,
2437 },
2438
2439 Map {
2440 f: ClosureExpr,
2441 input: Box<HydroNode>,
2442 metadata: HydroIrMetadata,
2443 },
2444 FlatMap {
2445 f: ClosureExpr,
2446 input: Box<HydroNode>,
2447 metadata: HydroIrMetadata,
2448 },
2449 FlatMapStreamBlocking {
2450 f: ClosureExpr,
2451 input: Box<HydroNode>,
2452 metadata: HydroIrMetadata,
2453 },
2454 Filter {
2455 f: ClosureExpr,
2456 input: Box<HydroNode>,
2457 metadata: HydroIrMetadata,
2458 },
2459 FilterMap {
2460 f: ClosureExpr,
2461 input: Box<HydroNode>,
2462 metadata: HydroIrMetadata,
2463 },
2464
2465 DeferTick {
2466 input: Box<HydroNode>,
2467 metadata: HydroIrMetadata,
2468 },
2469 Enumerate {
2470 input: Box<HydroNode>,
2471 metadata: HydroIrMetadata,
2472 },
2473 Inspect {
2474 f: ClosureExpr,
2475 input: Box<HydroNode>,
2476 metadata: HydroIrMetadata,
2477 },
2478
2479 Unique {
2480 input: Box<HydroNode>,
2481 metadata: HydroIrMetadata,
2482 },
2483
2484 Sort {
2485 input: Box<HydroNode>,
2486 metadata: HydroIrMetadata,
2487 },
2488 Fold {
2489 init: ClosureExpr,
2490 acc: ClosureExpr,
2491 input: Box<HydroNode>,
2492 metadata: HydroIrMetadata,
2493 },
2494
2495 Scan {
2496 init: ClosureExpr,
2497 acc: ClosureExpr,
2498 input: Box<HydroNode>,
2499 metadata: HydroIrMetadata,
2500 },
2501 ScanAsyncBlocking {
2502 init: ClosureExpr,
2503 acc: ClosureExpr,
2504 input: Box<HydroNode>,
2505 metadata: HydroIrMetadata,
2506 },
2507 FoldKeyed {
2508 init: ClosureExpr,
2509 acc: ClosureExpr,
2510 input: Box<HydroNode>,
2511 metadata: HydroIrMetadata,
2512 },
2513
2514 Reduce {
2515 f: ClosureExpr,
2516 input: Box<HydroNode>,
2517 metadata: HydroIrMetadata,
2518 },
2519 ReduceKeyed {
2520 f: ClosureExpr,
2521 input: Box<HydroNode>,
2522 metadata: HydroIrMetadata,
2523 },
2524 ReduceKeyedWatermark {
2525 f: ClosureExpr,
2526 input: Box<HydroNode>,
2527 watermark: Box<HydroNode>,
2528 metadata: HydroIrMetadata,
2529 },
2530
2531 Network {
2532 name: Option<String>,
2533 networking_info: crate::networking::NetworkingInfo,
2534 serialize_fn: Option<DebugExpr>,
2535 instantiate_fn: DebugInstantiate,
2536 deserialize_fn: Option<DebugExpr>,
2537 input: Box<HydroNode>,
2538 metadata: HydroIrMetadata,
2539 },
2540
2541 ExternalInput {
2542 from_external_key: LocationKey,
2543 from_port_id: ExternalPortId,
2544 from_many: bool,
2545 codec_type: DebugType,
2546 #[serde(skip)]
2547 port_hint: NetworkHint,
2548 instantiate_fn: DebugInstantiate,
2549 deserialize_fn: Option<DebugExpr>,
2550 metadata: HydroIrMetadata,
2551 },
2552
2553 Counter {
2554 tag: String,
2555 duration: DebugExpr,
2556 prefix: String,
2557 input: Box<HydroNode>,
2558 metadata: HydroIrMetadata,
2559 },
2560
2561 AssertIsConsistent {
2562 inner: Box<HydroNode>,
2563 trusted: bool,
2564 metadata: HydroIrMetadata,
2565 },
2566
2567 UnboundSingleton {
2568 inner: Box<HydroNode>,
2569 metadata: HydroIrMetadata,
2570 },
2571}
2572
2573pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2574pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2575
2576impl HydroNode {
2577 pub fn transform_bottom_up(
2578 &mut self,
2579 transform: &mut impl FnMut(&mut HydroNode),
2580 seen_tees: &mut SeenSharedNodes,
2581 check_well_formed: bool,
2582 ) {
2583 self.transform_children(
2584 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2585 seen_tees,
2586 );
2587
2588 transform(self);
2589
2590 let self_location = self.metadata().location_id.root();
2591
2592 if check_well_formed {
2593 match &*self {
2594 HydroNode::Network { .. } => {}
2595 _ => {
2596 self.input_metadata().iter().for_each(|i| {
2597 if i.location_id.root() != self_location {
2598 panic!(
2599 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2600 i,
2601 i.location_id.root(),
2602 self,
2603 self_location
2604 )
2605 }
2606 });
2607 }
2608 }
2609 }
2610 }
2611
2612 #[inline(always)]
2613 pub fn transform_children(
2614 &mut self,
2615 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2616 seen_tees: &mut SeenSharedNodes,
2617 ) {
2618 match self {
2619 HydroNode::Placeholder => {
2620 panic!();
2621 }
2622
2623 HydroNode::Source { .. }
2624 | HydroNode::SingletonSource { .. }
2625 | HydroNode::CycleSource { .. }
2626 | HydroNode::ExternalInput { .. } => {}
2627
2628 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2629 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2630 *inner = SharedNode(transformed.clone());
2631 } else {
2632 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2633 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2634 let mut orig = inner.0.replace(HydroNode::Placeholder);
2635 transform(&mut orig, seen_tees);
2636 *transformed_cell.borrow_mut() = orig;
2637 *inner = SharedNode(transformed_cell);
2638 }
2639 }
2640
2641 HydroNode::Partition { inner, f, .. } => {
2642 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2643 *inner = SharedNode(transformed.clone());
2644 } else {
2645 f.transform_children(&mut transform, seen_tees);
2646 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2647 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2648 let mut orig = inner.0.replace(HydroNode::Placeholder);
2649 transform(&mut orig, seen_tees);
2650 *transformed_cell.borrow_mut() = orig;
2651 *inner = SharedNode(transformed_cell);
2652 }
2653 }
2654
2655 HydroNode::Cast { inner, .. }
2656 | HydroNode::ObserveNonDet { inner, .. }
2657 | HydroNode::BeginAtomic { inner, .. }
2658 | HydroNode::EndAtomic { inner, .. }
2659 | HydroNode::Batch { inner, .. }
2660 | HydroNode::YieldConcat { inner, .. }
2661 | HydroNode::UnboundSingleton { inner, .. }
2662 | HydroNode::AssertIsConsistent { inner, .. } => {
2663 transform(inner.as_mut(), seen_tees);
2664 }
2665
2666 HydroNode::Chain { first, second, .. } => {
2667 transform(first.as_mut(), seen_tees);
2668 transform(second.as_mut(), seen_tees);
2669 }
2670
2671 HydroNode::MergeOrdered { first, second, .. } => {
2672 transform(first.as_mut(), seen_tees);
2673 transform(second.as_mut(), seen_tees);
2674 }
2675
2676 HydroNode::ChainFirst { first, second, .. } => {
2677 transform(first.as_mut(), seen_tees);
2678 transform(second.as_mut(), seen_tees);
2679 }
2680
2681 HydroNode::CrossSingleton { left, right, .. }
2682 | HydroNode::CrossProduct { left, right, .. }
2683 | HydroNode::Join { left, right, .. }
2684 | HydroNode::JoinHalf { left, right, .. } => {
2685 transform(left.as_mut(), seen_tees);
2686 transform(right.as_mut(), seen_tees);
2687 }
2688
2689 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2690 transform(pos.as_mut(), seen_tees);
2691 transform(neg.as_mut(), seen_tees);
2692 }
2693
2694 HydroNode::Map { f, input, .. } => {
2695 f.transform_children(&mut transform, seen_tees);
2696 transform(input.as_mut(), seen_tees);
2697 }
2698 HydroNode::FlatMap { f, input, .. }
2699 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2700 | HydroNode::Filter { f, input, .. }
2701 | HydroNode::FilterMap { f, input, .. }
2702 | HydroNode::Inspect { f, input, .. }
2703 | HydroNode::Reduce { f, input, .. }
2704 | HydroNode::ReduceKeyed { f, input, .. } => {
2705 f.transform_children(&mut transform, seen_tees);
2706 transform(input.as_mut(), seen_tees);
2707 }
2708 HydroNode::ReduceKeyedWatermark {
2709 f,
2710 input,
2711 watermark,
2712 ..
2713 } => {
2714 f.transform_children(&mut transform, seen_tees);
2715 transform(input.as_mut(), seen_tees);
2716 transform(watermark.as_mut(), seen_tees);
2717 }
2718 HydroNode::Fold {
2719 init, acc, input, ..
2720 }
2721 | HydroNode::Scan {
2722 init, acc, input, ..
2723 }
2724 | HydroNode::ScanAsyncBlocking {
2725 init, acc, input, ..
2726 }
2727 | HydroNode::FoldKeyed {
2728 init, acc, input, ..
2729 } => {
2730 init.transform_children(&mut transform, seen_tees);
2731 acc.transform_children(&mut transform, seen_tees);
2732 transform(input.as_mut(), seen_tees);
2733 }
2734 HydroNode::ResolveFutures { input, .. }
2735 | HydroNode::ResolveFuturesBlocking { input, .. }
2736 | HydroNode::ResolveFuturesOrdered { input, .. }
2737 | HydroNode::Sort { input, .. }
2738 | HydroNode::DeferTick { input, .. }
2739 | HydroNode::Enumerate { input, .. }
2740 | HydroNode::Unique { input, .. }
2741 | HydroNode::Network { input, .. }
2742 | HydroNode::Counter { input, .. } => {
2743 transform(input.as_mut(), seen_tees);
2744 }
2745 }
2746 }
2747
2748 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2749 match self {
2750 HydroNode::Placeholder => HydroNode::Placeholder,
2751 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2752 inner: Box::new(inner.deep_clone(seen_tees)),
2753 metadata: metadata.clone(),
2754 },
2755 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2756 inner: Box::new(inner.deep_clone(seen_tees)),
2757 metadata: metadata.clone(),
2758 },
2759 HydroNode::ObserveNonDet {
2760 inner,
2761 trusted,
2762 metadata,
2763 } => HydroNode::ObserveNonDet {
2764 inner: Box::new(inner.deep_clone(seen_tees)),
2765 trusted: *trusted,
2766 metadata: metadata.clone(),
2767 },
2768 HydroNode::AssertIsConsistent {
2769 inner,
2770 trusted,
2771 metadata,
2772 } => HydroNode::AssertIsConsistent {
2773 inner: Box::new(inner.deep_clone(seen_tees)),
2774 trusted: *trusted,
2775 metadata: metadata.clone(),
2776 },
2777 HydroNode::Source { source, metadata } => HydroNode::Source {
2778 source: source.clone(),
2779 metadata: metadata.clone(),
2780 },
2781 HydroNode::SingletonSource {
2782 value,
2783 first_tick_only,
2784 metadata,
2785 } => HydroNode::SingletonSource {
2786 value: value.clone(),
2787 first_tick_only: *first_tick_only,
2788 metadata: metadata.clone(),
2789 },
2790 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2791 cycle_id: *cycle_id,
2792 metadata: metadata.clone(),
2793 },
2794 HydroNode::Tee { inner, metadata }
2795 | HydroNode::Reference {
2796 inner, metadata, ..
2797 } => {
2798 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2799 SharedNode(transformed.clone())
2800 } else {
2801 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2802 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2803 let cloned = inner.0.borrow().deep_clone(seen_tees);
2804 *new_rc.borrow_mut() = cloned;
2805 SharedNode(new_rc)
2806 };
2807 if let HydroNode::Reference {
2808 kind,
2809 access_counter,
2810 ..
2811 } = self
2812 {
2813 HydroNode::Reference {
2814 inner: cloned_inner,
2815 kind: *kind,
2816 access_counter: access_counter.freeze(),
2817 metadata: metadata.clone(),
2818 }
2819 } else {
2820 HydroNode::Tee {
2821 inner: cloned_inner,
2822 metadata: metadata.clone(),
2823 }
2824 }
2825 }
2826 HydroNode::Partition {
2827 inner,
2828 f,
2829 is_true,
2830 metadata,
2831 } => {
2832 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2833 HydroNode::Partition {
2834 inner: SharedNode(transformed.clone()),
2835 f: f.deep_clone(seen_tees),
2836 is_true: *is_true,
2837 metadata: metadata.clone(),
2838 }
2839 } else {
2840 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2841 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2842 let cloned = inner.0.borrow().deep_clone(seen_tees);
2843 *new_rc.borrow_mut() = cloned;
2844 HydroNode::Partition {
2845 inner: SharedNode(new_rc),
2846 f: f.deep_clone(seen_tees),
2847 is_true: *is_true,
2848 metadata: metadata.clone(),
2849 }
2850 }
2851 }
2852 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2853 inner: Box::new(inner.deep_clone(seen_tees)),
2854 metadata: metadata.clone(),
2855 },
2856 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2857 inner: Box::new(inner.deep_clone(seen_tees)),
2858 metadata: metadata.clone(),
2859 },
2860 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2861 inner: Box::new(inner.deep_clone(seen_tees)),
2862 metadata: metadata.clone(),
2863 },
2864 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2865 inner: Box::new(inner.deep_clone(seen_tees)),
2866 metadata: metadata.clone(),
2867 },
2868 HydroNode::Chain {
2869 first,
2870 second,
2871 metadata,
2872 } => HydroNode::Chain {
2873 first: Box::new(first.deep_clone(seen_tees)),
2874 second: Box::new(second.deep_clone(seen_tees)),
2875 metadata: metadata.clone(),
2876 },
2877 HydroNode::MergeOrdered {
2878 first,
2879 second,
2880 metadata,
2881 } => HydroNode::MergeOrdered {
2882 first: Box::new(first.deep_clone(seen_tees)),
2883 second: Box::new(second.deep_clone(seen_tees)),
2884 metadata: metadata.clone(),
2885 },
2886 HydroNode::ChainFirst {
2887 first,
2888 second,
2889 metadata,
2890 } => HydroNode::ChainFirst {
2891 first: Box::new(first.deep_clone(seen_tees)),
2892 second: Box::new(second.deep_clone(seen_tees)),
2893 metadata: metadata.clone(),
2894 },
2895 HydroNode::CrossProduct {
2896 left,
2897 right,
2898 metadata,
2899 } => HydroNode::CrossProduct {
2900 left: Box::new(left.deep_clone(seen_tees)),
2901 right: Box::new(right.deep_clone(seen_tees)),
2902 metadata: metadata.clone(),
2903 },
2904 HydroNode::CrossSingleton {
2905 left,
2906 right,
2907 metadata,
2908 } => HydroNode::CrossSingleton {
2909 left: Box::new(left.deep_clone(seen_tees)),
2910 right: Box::new(right.deep_clone(seen_tees)),
2911 metadata: metadata.clone(),
2912 },
2913 HydroNode::Join {
2914 left,
2915 right,
2916 metadata,
2917 } => HydroNode::Join {
2918 left: Box::new(left.deep_clone(seen_tees)),
2919 right: Box::new(right.deep_clone(seen_tees)),
2920 metadata: metadata.clone(),
2921 },
2922 HydroNode::JoinHalf {
2923 left,
2924 right,
2925 metadata,
2926 } => HydroNode::JoinHalf {
2927 left: Box::new(left.deep_clone(seen_tees)),
2928 right: Box::new(right.deep_clone(seen_tees)),
2929 metadata: metadata.clone(),
2930 },
2931 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2932 pos: Box::new(pos.deep_clone(seen_tees)),
2933 neg: Box::new(neg.deep_clone(seen_tees)),
2934 metadata: metadata.clone(),
2935 },
2936 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2937 pos: Box::new(pos.deep_clone(seen_tees)),
2938 neg: Box::new(neg.deep_clone(seen_tees)),
2939 metadata: metadata.clone(),
2940 },
2941 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2942 input: Box::new(input.deep_clone(seen_tees)),
2943 metadata: metadata.clone(),
2944 },
2945 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2946 HydroNode::ResolveFuturesBlocking {
2947 input: Box::new(input.deep_clone(seen_tees)),
2948 metadata: metadata.clone(),
2949 }
2950 }
2951 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2952 HydroNode::ResolveFuturesOrdered {
2953 input: Box::new(input.deep_clone(seen_tees)),
2954 metadata: metadata.clone(),
2955 }
2956 }
2957 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2958 f: f.deep_clone(seen_tees),
2959 input: Box::new(input.deep_clone(seen_tees)),
2960 metadata: metadata.clone(),
2961 },
2962 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2963 f: f.deep_clone(seen_tees),
2964 input: Box::new(input.deep_clone(seen_tees)),
2965 metadata: metadata.clone(),
2966 },
2967 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2968 HydroNode::FlatMapStreamBlocking {
2969 f: f.deep_clone(seen_tees),
2970 input: Box::new(input.deep_clone(seen_tees)),
2971 metadata: metadata.clone(),
2972 }
2973 }
2974 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2975 f: f.deep_clone(seen_tees),
2976 input: Box::new(input.deep_clone(seen_tees)),
2977 metadata: metadata.clone(),
2978 },
2979 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2980 f: f.deep_clone(seen_tees),
2981 input: Box::new(input.deep_clone(seen_tees)),
2982 metadata: metadata.clone(),
2983 },
2984 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2985 input: Box::new(input.deep_clone(seen_tees)),
2986 metadata: metadata.clone(),
2987 },
2988 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2989 input: Box::new(input.deep_clone(seen_tees)),
2990 metadata: metadata.clone(),
2991 },
2992 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2993 f: f.deep_clone(seen_tees),
2994 input: Box::new(input.deep_clone(seen_tees)),
2995 metadata: metadata.clone(),
2996 },
2997 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2998 input: Box::new(input.deep_clone(seen_tees)),
2999 metadata: metadata.clone(),
3000 },
3001 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3002 input: Box::new(input.deep_clone(seen_tees)),
3003 metadata: metadata.clone(),
3004 },
3005 HydroNode::Fold {
3006 init,
3007 acc,
3008 input,
3009 metadata,
3010 } => HydroNode::Fold {
3011 init: init.deep_clone(seen_tees),
3012 acc: acc.deep_clone(seen_tees),
3013 input: Box::new(input.deep_clone(seen_tees)),
3014 metadata: metadata.clone(),
3015 },
3016 HydroNode::Scan {
3017 init,
3018 acc,
3019 input,
3020 metadata,
3021 } => HydroNode::Scan {
3022 init: init.deep_clone(seen_tees),
3023 acc: acc.deep_clone(seen_tees),
3024 input: Box::new(input.deep_clone(seen_tees)),
3025 metadata: metadata.clone(),
3026 },
3027 HydroNode::ScanAsyncBlocking {
3028 init,
3029 acc,
3030 input,
3031 metadata,
3032 } => HydroNode::ScanAsyncBlocking {
3033 init: init.deep_clone(seen_tees),
3034 acc: acc.deep_clone(seen_tees),
3035 input: Box::new(input.deep_clone(seen_tees)),
3036 metadata: metadata.clone(),
3037 },
3038 HydroNode::FoldKeyed {
3039 init,
3040 acc,
3041 input,
3042 metadata,
3043 } => HydroNode::FoldKeyed {
3044 init: init.deep_clone(seen_tees),
3045 acc: acc.deep_clone(seen_tees),
3046 input: Box::new(input.deep_clone(seen_tees)),
3047 metadata: metadata.clone(),
3048 },
3049 HydroNode::ReduceKeyedWatermark {
3050 f,
3051 input,
3052 watermark,
3053 metadata,
3054 } => HydroNode::ReduceKeyedWatermark {
3055 f: f.deep_clone(seen_tees),
3056 input: Box::new(input.deep_clone(seen_tees)),
3057 watermark: Box::new(watermark.deep_clone(seen_tees)),
3058 metadata: metadata.clone(),
3059 },
3060 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3061 f: f.deep_clone(seen_tees),
3062 input: Box::new(input.deep_clone(seen_tees)),
3063 metadata: metadata.clone(),
3064 },
3065 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3066 f: f.deep_clone(seen_tees),
3067 input: Box::new(input.deep_clone(seen_tees)),
3068 metadata: metadata.clone(),
3069 },
3070 HydroNode::Network {
3071 name,
3072 networking_info,
3073 serialize_fn,
3074 instantiate_fn,
3075 deserialize_fn,
3076 input,
3077 metadata,
3078 } => HydroNode::Network {
3079 name: name.clone(),
3080 networking_info: networking_info.clone(),
3081 serialize_fn: serialize_fn.clone(),
3082 instantiate_fn: instantiate_fn.clone(),
3083 deserialize_fn: deserialize_fn.clone(),
3084 input: Box::new(input.deep_clone(seen_tees)),
3085 metadata: metadata.clone(),
3086 },
3087 HydroNode::ExternalInput {
3088 from_external_key,
3089 from_port_id,
3090 from_many,
3091 codec_type,
3092 port_hint,
3093 instantiate_fn,
3094 deserialize_fn,
3095 metadata,
3096 } => HydroNode::ExternalInput {
3097 from_external_key: *from_external_key,
3098 from_port_id: *from_port_id,
3099 from_many: *from_many,
3100 codec_type: codec_type.clone(),
3101 port_hint: *port_hint,
3102 instantiate_fn: instantiate_fn.clone(),
3103 deserialize_fn: deserialize_fn.clone(),
3104 metadata: metadata.clone(),
3105 },
3106 HydroNode::Counter {
3107 tag,
3108 duration,
3109 prefix,
3110 input,
3111 metadata,
3112 } => HydroNode::Counter {
3113 tag: tag.clone(),
3114 duration: duration.clone(),
3115 prefix: prefix.clone(),
3116 input: Box::new(input.deep_clone(seen_tees)),
3117 metadata: metadata.clone(),
3118 },
3119 }
3120 }
3121
3122 #[cfg(feature = "build")]
3123 pub fn emit_core(
3124 &mut self,
3125 builders_or_callback: &mut BuildersOrCallback<
3126 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3127 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3128 >,
3129 seen_tees: &mut SeenSharedNodes,
3130 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3131 next_stmt_id: &mut crate::Counter<StmtId>,
3132 fold_hooked_idents: &mut HashSet<String>,
3133 ) -> syn::Ident {
3134 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3135
3136 self.transform_bottom_up(
3137 &mut |node: &mut HydroNode| {
3138 let out_location = node.metadata().location_id.clone();
3139 match node {
3140 HydroNode::Placeholder => {
3141 panic!()
3142 }
3143
3144 HydroNode::Cast { .. } => {
3145 let _ = next_stmt_id.get_and_increment();
3148 match builders_or_callback {
3149 BuildersOrCallback::Builders(_) => {}
3150 BuildersOrCallback::Callback(_, node_callback) => {
3151 node_callback(node, next_stmt_id);
3152 }
3153 }
3154 }
3156
3157 HydroNode::UnboundSingleton { .. } => {
3158 let inner_ident = ident_stack.pop().unwrap();
3159
3160 let stmt_id = next_stmt_id.get_and_increment();
3161 let out_ident =
3162 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3163
3164 match builders_or_callback {
3165 BuildersOrCallback::Builders(graph_builders) => {
3166 if graph_builders.singleton_intermediates() {
3167 let builder = graph_builders.get_dfir_mut(&out_location);
3168 builder.add_dfir(
3169 parse_quote! {
3170 #out_ident = #inner_ident;
3171 },
3172 None,
3173 None,
3174 );
3175 } else {
3176 let builder = graph_builders.get_dfir_mut(&out_location);
3177 builder.add_dfir(
3178 parse_quote! {
3179 #out_ident = #inner_ident -> persist::<'static>();
3180 },
3181 None,
3182 None,
3183 );
3184 }
3185 }
3186 BuildersOrCallback::Callback(_, node_callback) => {
3187 node_callback(node, next_stmt_id);
3188 }
3189 }
3190
3191 ident_stack.push(out_ident);
3192 }
3193
3194 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3195 let inner_ident = ident_stack.pop().unwrap();
3196
3197 let stmt_id = next_stmt_id.get_and_increment();
3198 let out_ident =
3199 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3200
3201 match builders_or_callback {
3202 BuildersOrCallback::Builders(graph_builders) => {
3203 graph_builders.assert_is_consistent(
3204 *trusted,
3205 &inner.metadata().location_id,
3206 inner_ident,
3207 &out_ident,
3208 );
3209 }
3210 BuildersOrCallback::Callback(_, node_callback) => {
3211 node_callback(node, next_stmt_id);
3212 }
3213 }
3214
3215 ident_stack.push(out_ident);
3216 }
3217
3218 HydroNode::ObserveNonDet {
3219 inner,
3220 trusted,
3221 metadata,
3222 ..
3223 } => {
3224 let inner_ident = ident_stack.pop().unwrap();
3225
3226 let stmt_id = next_stmt_id.get_and_increment();
3227 let observe_ident =
3228 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3229
3230 match builders_or_callback {
3231 BuildersOrCallback::Builders(graph_builders) => {
3232 graph_builders.observe_nondet(
3233 *trusted,
3234 &inner.metadata().location_id,
3235 inner_ident,
3236 &inner.metadata().collection_kind,
3237 &observe_ident,
3238 &metadata.collection_kind,
3239 &metadata.op,
3240 );
3241 }
3242 BuildersOrCallback::Callback(_, node_callback) => {
3243 node_callback(node, next_stmt_id);
3244 }
3245 }
3246
3247 ident_stack.push(observe_ident);
3248 }
3249
3250 HydroNode::Batch {
3251 inner, metadata, ..
3252 } => {
3253 let inner_ident = ident_stack.pop().unwrap();
3254
3255 let stmt_id = next_stmt_id.get_and_increment();
3256 let batch_ident =
3257 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3258
3259 match builders_or_callback {
3260 BuildersOrCallback::Builders(graph_builders) => {
3261 graph_builders.batch(
3262 inner_ident,
3263 &inner.metadata().location_id,
3264 &inner.metadata().collection_kind,
3265 &batch_ident,
3266 &out_location,
3267 &metadata.op,
3268 fold_hooked_idents,
3269 );
3270 }
3271 BuildersOrCallback::Callback(_, node_callback) => {
3272 node_callback(node, next_stmt_id);
3273 }
3274 }
3275
3276 ident_stack.push(batch_ident);
3277 }
3278
3279 HydroNode::YieldConcat { inner, .. } => {
3280 let inner_ident = ident_stack.pop().unwrap();
3281
3282 let stmt_id = next_stmt_id.get_and_increment();
3283 let yield_ident =
3284 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3285
3286 match builders_or_callback {
3287 BuildersOrCallback::Builders(graph_builders) => {
3288 graph_builders.yield_from_tick(
3289 inner_ident,
3290 &inner.metadata().location_id,
3291 &inner.metadata().collection_kind,
3292 &yield_ident,
3293 &out_location,
3294 );
3295 }
3296 BuildersOrCallback::Callback(_, node_callback) => {
3297 node_callback(node, next_stmt_id);
3298 }
3299 }
3300
3301 ident_stack.push(yield_ident);
3302 }
3303
3304 HydroNode::BeginAtomic { inner, metadata } => {
3305 let inner_ident = ident_stack.pop().unwrap();
3306
3307 let stmt_id = next_stmt_id.get_and_increment();
3308 let begin_ident =
3309 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3310
3311 match builders_or_callback {
3312 BuildersOrCallback::Builders(graph_builders) => {
3313 graph_builders.begin_atomic(
3314 inner_ident,
3315 &inner.metadata().location_id,
3316 &inner.metadata().collection_kind,
3317 &begin_ident,
3318 &out_location,
3319 &metadata.op,
3320 );
3321 }
3322 BuildersOrCallback::Callback(_, node_callback) => {
3323 node_callback(node, next_stmt_id);
3324 }
3325 }
3326
3327 ident_stack.push(begin_ident);
3328 }
3329
3330 HydroNode::EndAtomic { inner, .. } => {
3331 let inner_ident = ident_stack.pop().unwrap();
3332
3333 let stmt_id = next_stmt_id.get_and_increment();
3334 let end_ident =
3335 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3336
3337 match builders_or_callback {
3338 BuildersOrCallback::Builders(graph_builders) => {
3339 graph_builders.end_atomic(
3340 inner_ident,
3341 &inner.metadata().location_id,
3342 &inner.metadata().collection_kind,
3343 &end_ident,
3344 );
3345 }
3346 BuildersOrCallback::Callback(_, node_callback) => {
3347 node_callback(node, next_stmt_id);
3348 }
3349 }
3350
3351 ident_stack.push(end_ident);
3352 }
3353
3354 HydroNode::Source {
3355 source, metadata, ..
3356 } => {
3357 if let HydroSource::ExternalNetwork() = source {
3358 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3359 } else {
3360 let stmt_id = next_stmt_id.get_and_increment();
3361 let source_ident =
3362 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3363
3364 let source_stmt = match source {
3365 HydroSource::Stream(expr) => {
3366 debug_assert!(metadata.location_id.is_top_level());
3367 parse_quote! {
3368 #source_ident = source_stream(#expr);
3369 }
3370 }
3371
3372 HydroSource::ExternalNetwork() => {
3373 unreachable!()
3374 }
3375
3376 HydroSource::Iter(expr) => {
3377 if metadata.location_id.is_top_level() {
3378 parse_quote! {
3379 #source_ident = source_iter(#expr);
3380 }
3381 } else {
3382 parse_quote! {
3384 #source_ident = source_iter(#expr) -> persist::<'static>();
3385 }
3386 }
3387 }
3388
3389 HydroSource::Spin() => {
3390 debug_assert!(metadata.location_id.is_top_level());
3391 parse_quote! {
3392 #source_ident = spin();
3393 }
3394 }
3395
3396 HydroSource::ClusterMembers(target_loc, state) => {
3397 debug_assert!(metadata.location_id.is_top_level());
3398
3399 let members_tee_ident = syn::Ident::new(
3400 &format!(
3401 "__cluster_members_tee_{}_{}",
3402 metadata.location_id.root().key(),
3403 target_loc.key(),
3404 ),
3405 Span::call_site(),
3406 );
3407
3408 match state {
3409 ClusterMembersState::Stream(d) => {
3410 parse_quote! {
3411 #members_tee_ident = source_stream(#d) -> tee();
3412 #source_ident = #members_tee_ident;
3413 }
3414 },
3415 ClusterMembersState::Uninit => syn::parse_quote! {
3416 #source_ident = source_stream(DUMMY);
3417 },
3418 ClusterMembersState::Tee(..) => parse_quote! {
3419 #source_ident = #members_tee_ident;
3420 },
3421 }
3422 }
3423
3424 HydroSource::Embedded(ident) => {
3425 parse_quote! {
3426 #source_ident = source_stream(#ident);
3427 }
3428 }
3429
3430 HydroSource::EmbeddedSingleton(ident) => {
3431 parse_quote! {
3432 #source_ident = source_iter([#ident]);
3433 }
3434 }
3435 };
3436
3437 match builders_or_callback {
3438 BuildersOrCallback::Builders(graph_builders) => {
3439 let builder = graph_builders.get_dfir_mut(&out_location);
3440 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3441 }
3442 BuildersOrCallback::Callback(_, node_callback) => {
3443 node_callback(node, next_stmt_id);
3444 }
3445 }
3446
3447 ident_stack.push(source_ident);
3448 }
3449 }
3450
3451 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3452 let stmt_id = next_stmt_id.get_and_increment();
3453 let source_ident =
3454 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3455
3456 match builders_or_callback {
3457 BuildersOrCallback::Builders(graph_builders) => {
3458 let builder = graph_builders.get_dfir_mut(&out_location);
3459
3460 if *first_tick_only {
3461 assert!(
3462 !metadata.location_id.is_top_level(),
3463 "first_tick_only SingletonSource must be inside a tick"
3464 );
3465 }
3466
3467 if *first_tick_only
3468 || (metadata.location_id.is_top_level()
3469 && metadata.collection_kind.is_bounded())
3470 {
3471 builder.add_dfir(
3472 parse_quote! {
3473 #source_ident = source_iter([#value]);
3474 },
3475 None,
3476 Some(&stmt_id.to_string()),
3477 );
3478 } else {
3479 builder.add_dfir(
3480 parse_quote! {
3481 #source_ident = source_iter([#value]) -> persist::<'static>();
3482 },
3483 None,
3484 Some(&stmt_id.to_string()),
3485 );
3486 }
3487 }
3488 BuildersOrCallback::Callback(_, node_callback) => {
3489 node_callback(node, next_stmt_id);
3490 }
3491 }
3492
3493 ident_stack.push(source_ident);
3494 }
3495
3496 HydroNode::CycleSource { cycle_id, .. } => {
3497 let ident = cycle_id.as_ident();
3498
3499 let _ = next_stmt_id.get_and_increment();
3501
3502 match builders_or_callback {
3503 BuildersOrCallback::Builders(_) => {}
3504 BuildersOrCallback::Callback(_, node_callback) => {
3505 node_callback(node, next_stmt_id);
3506 }
3507 }
3508
3509 ident_stack.push(ident);
3510 }
3511
3512 HydroNode::Tee { inner, .. } => {
3513 let stmt_id = next_stmt_id.get_and_increment();
3516
3517 let ret_ident = if let Some(built_idents) =
3518 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3519 {
3520 match builders_or_callback {
3521 BuildersOrCallback::Builders(_) => {}
3522 BuildersOrCallback::Callback(_, node_callback) => {
3523 node_callback(node, next_stmt_id);
3524 }
3525 }
3526
3527 built_idents[0].clone()
3528 } else {
3529 let inner_ident = ident_stack.pop().unwrap();
3532
3533 let tee_ident =
3534 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3535
3536 built_tees.insert(
3537 inner.0.as_ref() as *const RefCell<HydroNode>,
3538 vec![tee_ident.clone()],
3539 );
3540
3541 match builders_or_callback {
3542 BuildersOrCallback::Builders(graph_builders) => {
3543 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3555 fold_hooked_idents.insert(tee_ident.to_string());
3556 }
3557 let builder = graph_builders.get_dfir_mut(&out_location);
3558 builder.add_dfir(
3559 parse_quote! {
3560 #tee_ident = #inner_ident -> tee();
3561 },
3562 None,
3563 Some(&stmt_id.to_string()),
3564 );
3565 }
3566 BuildersOrCallback::Callback(_, node_callback) => {
3567 node_callback(node, next_stmt_id);
3568 }
3569 }
3570
3571 tee_ident
3572 };
3573
3574 ident_stack.push(ret_ident);
3575 }
3576
3577 HydroNode::Reference { inner, kind, .. } => {
3578 let stmt_id = next_stmt_id.get_and_increment();
3581
3582 let ret_ident = if let Some(built_idents) =
3583 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3584 {
3585 built_idents[0].clone()
3586 } else {
3587 let inner_ident = ident_stack.pop().unwrap();
3588
3589 let ref_ident =
3590 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3591
3592 built_tees.insert(
3593 inner.0.as_ref() as *const RefCell<HydroNode>,
3594 vec![ref_ident.clone()],
3595 );
3596
3597 match builders_or_callback {
3598 BuildersOrCallback::Builders(graph_builders) => {
3599 let builder = graph_builders.get_dfir_mut(&out_location);
3600 let op_ident = syn::Ident::new(
3601 match kind {
3602 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3603 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3604 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3605 },
3606 Span::call_site(),
3607 );
3608 builder.add_dfir(
3609 parse_quote! {
3610 #ref_ident = #inner_ident -> #op_ident();
3611 },
3612 None,
3613 Some(&stmt_id.to_string()),
3614 );
3615 }
3616 BuildersOrCallback::Callback(_, node_callback) => {
3617 node_callback(node, next_stmt_id);
3618 }
3619 }
3620
3621 ref_ident
3622 };
3623
3624 ident_stack.push(ret_ident);
3625 }
3626
3627 HydroNode::Partition {
3628 inner, f, is_true, ..
3629 } => {
3630 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3632 let stmt_id = next_stmt_id.get_and_increment();
3633
3634 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3635 match builders_or_callback {
3636 BuildersOrCallback::Builders(_) => {}
3637 BuildersOrCallback::Callback(_, node_callback) => {
3638 node_callback(node, next_stmt_id);
3639 }
3640 }
3641
3642 let idx = if is_true { 0 } else { 1 };
3643 built_idents[idx].clone()
3644 } else {
3645 let inner_ident = ident_stack.pop().unwrap();
3648 let f_tokens = f.emit_tokens(&mut ident_stack);
3649
3650 let partition_ident = syn::Ident::new(
3651 &format!("stream_{}_partition", stmt_id),
3652 Span::call_site(),
3653 );
3654 let true_ident = syn::Ident::new(
3655 &format!("stream_{}_true", stmt_id),
3656 Span::call_site(),
3657 );
3658 let false_ident = syn::Ident::new(
3659 &format!("stream_{}_false", stmt_id),
3660 Span::call_site(),
3661 );
3662
3663 built_tees.insert(
3664 ptr,
3665 vec![true_ident.clone(), false_ident.clone()],
3666 );
3667
3668 let stmt_id = next_stmt_id.get_and_increment();
3669 match builders_or_callback {
3670 BuildersOrCallback::Builders(graph_builders) => {
3671 let builder = graph_builders.get_dfir_mut(&out_location);
3672 builder.add_dfir(
3673 parse_quote! {
3674 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3675 #true_ident = #partition_ident[0];
3676 #false_ident = #partition_ident[1];
3677 },
3678 None,
3679 Some(&stmt_id.to_string()),
3680 );
3681 }
3682 BuildersOrCallback::Callback(_, node_callback) => {
3683 node_callback(node, next_stmt_id);
3684 }
3685 }
3686
3687 if is_true { true_ident } else { false_ident }
3688 };
3689
3690 ident_stack.push(ret_ident);
3691 }
3692
3693 HydroNode::Chain { .. } => {
3694 let second_ident = ident_stack.pop().unwrap();
3696 let first_ident = ident_stack.pop().unwrap();
3697
3698 let stmt_id = next_stmt_id.get_and_increment();
3699 let chain_ident =
3700 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3701
3702 match builders_or_callback {
3703 BuildersOrCallback::Builders(graph_builders) => {
3704 let builder = graph_builders.get_dfir_mut(&out_location);
3705 builder.add_dfir(
3706 parse_quote! {
3707 #chain_ident = chain();
3708 #first_ident -> [0]#chain_ident;
3709 #second_ident -> [1]#chain_ident;
3710 },
3711 None,
3712 Some(&stmt_id.to_string()),
3713 );
3714 }
3715 BuildersOrCallback::Callback(_, node_callback) => {
3716 node_callback(node, next_stmt_id);
3717 }
3718 }
3719
3720 ident_stack.push(chain_ident);
3721 }
3722
3723 HydroNode::MergeOrdered { first, metadata, .. } => {
3724 let second_ident = ident_stack.pop().unwrap();
3725 let first_ident = ident_stack.pop().unwrap();
3726
3727 let stmt_id = next_stmt_id.get_and_increment();
3728 let merge_ident =
3729 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3730
3731 match builders_or_callback {
3732 BuildersOrCallback::Builders(graph_builders) => {
3733 graph_builders.merge_ordered(
3734 &first.metadata().location_id,
3735 first_ident,
3736 second_ident,
3737 &merge_ident,
3738 &first.metadata().collection_kind,
3739 &metadata.op,
3740 Some(&stmt_id.to_string()),
3741 );
3742 }
3743 BuildersOrCallback::Callback(_, node_callback) => {
3744 node_callback(node, next_stmt_id);
3745 }
3746 }
3747
3748 ident_stack.push(merge_ident);
3749 }
3750
3751 HydroNode::ChainFirst { .. } => {
3752 let second_ident = ident_stack.pop().unwrap();
3753 let first_ident = ident_stack.pop().unwrap();
3754
3755 let stmt_id = next_stmt_id.get_and_increment();
3756 let chain_ident =
3757 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3758
3759 match builders_or_callback {
3760 BuildersOrCallback::Builders(graph_builders) => {
3761 let builder = graph_builders.get_dfir_mut(&out_location);
3762 builder.add_dfir(
3763 parse_quote! {
3764 #chain_ident = chain_first_n(1);
3765 #first_ident -> [0]#chain_ident;
3766 #second_ident -> [1]#chain_ident;
3767 },
3768 None,
3769 Some(&stmt_id.to_string()),
3770 );
3771 }
3772 BuildersOrCallback::Callback(_, node_callback) => {
3773 node_callback(node, next_stmt_id);
3774 }
3775 }
3776
3777 ident_stack.push(chain_ident);
3778 }
3779
3780 HydroNode::CrossSingleton { right, .. } => {
3781 let right_ident = ident_stack.pop().unwrap();
3782 let left_ident = ident_stack.pop().unwrap();
3783
3784 let stmt_id = next_stmt_id.get_and_increment();
3785 let cross_ident =
3786 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3787
3788 match builders_or_callback {
3789 BuildersOrCallback::Builders(graph_builders) => {
3790 let builder = graph_builders.get_dfir_mut(&out_location);
3791
3792 if right.metadata().location_id.is_top_level()
3793 && right.metadata().collection_kind.is_bounded()
3794 {
3795 builder.add_dfir(
3796 parse_quote! {
3797 #cross_ident = cross_singleton::<'static>();
3798 #left_ident -> [input]#cross_ident;
3799 #right_ident -> [single]#cross_ident;
3800 },
3801 None,
3802 Some(&stmt_id.to_string()),
3803 );
3804 } else {
3805 builder.add_dfir(
3806 parse_quote! {
3807 #cross_ident = cross_singleton();
3808 #left_ident -> [input]#cross_ident;
3809 #right_ident -> [single]#cross_ident;
3810 },
3811 None,
3812 Some(&stmt_id.to_string()),
3813 );
3814 }
3815 }
3816 BuildersOrCallback::Callback(_, node_callback) => {
3817 node_callback(node, next_stmt_id);
3818 }
3819 }
3820
3821 ident_stack.push(cross_ident);
3822 }
3823
3824 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3825 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3826 parse_quote!(cross_join_multiset)
3827 } else {
3828 parse_quote!(join_multiset)
3829 };
3830
3831 let (HydroNode::CrossProduct { left, right, .. }
3832 | HydroNode::Join { left, right, .. }) = node
3833 else {
3834 unreachable!()
3835 };
3836
3837 let is_top_level = left.metadata().location_id.is_top_level()
3838 && right.metadata().location_id.is_top_level();
3839 let left_lifetime = if left.metadata().location_id.is_top_level() {
3840 quote!('static)
3841 } else {
3842 quote!('tick)
3843 };
3844
3845 let right_lifetime = if right.metadata().location_id.is_top_level() {
3846 quote!('static)
3847 } else {
3848 quote!('tick)
3849 };
3850
3851 let right_ident = ident_stack.pop().unwrap();
3852 let left_ident = ident_stack.pop().unwrap();
3853
3854 let stmt_id = next_stmt_id.get_and_increment();
3855 let stream_ident =
3856 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3857
3858 match builders_or_callback {
3859 BuildersOrCallback::Builders(graph_builders) => {
3860 let builder = graph_builders.get_dfir_mut(&out_location);
3861 builder.add_dfir(
3862 if is_top_level {
3863 parse_quote! {
3866 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3867 #left_ident -> [0]#stream_ident;
3868 #right_ident -> [1]#stream_ident;
3869 }
3870 } else {
3871 parse_quote! {
3872 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3873 #left_ident -> [0]#stream_ident;
3874 #right_ident -> [1]#stream_ident;
3875 }
3876 }
3877 ,
3878 None,
3879 Some(&stmt_id.to_string()),
3880 );
3881 }
3882 BuildersOrCallback::Callback(_, node_callback) => {
3883 node_callback(node, next_stmt_id);
3884 }
3885 }
3886
3887 ident_stack.push(stream_ident);
3888 }
3889
3890 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3891 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3892 parse_quote!(difference)
3893 } else {
3894 parse_quote!(anti_join)
3895 };
3896
3897 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3898 node
3899 else {
3900 unreachable!()
3901 };
3902
3903 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3904 quote!('static)
3905 } else {
3906 quote!('tick)
3907 };
3908
3909 let neg_ident = ident_stack.pop().unwrap();
3910 let pos_ident = ident_stack.pop().unwrap();
3911
3912 let stmt_id = next_stmt_id.get_and_increment();
3913 let stream_ident =
3914 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3915
3916 match builders_or_callback {
3917 BuildersOrCallback::Builders(graph_builders) => {
3918 let builder = graph_builders.get_dfir_mut(&out_location);
3919 builder.add_dfir(
3920 parse_quote! {
3921 #stream_ident = #operator::<'tick, #neg_lifetime>();
3922 #pos_ident -> [pos]#stream_ident;
3923 #neg_ident -> [neg]#stream_ident;
3924 },
3925 None,
3926 Some(&stmt_id.to_string()),
3927 );
3928 }
3929 BuildersOrCallback::Callback(_, node_callback) => {
3930 node_callback(node, next_stmt_id);
3931 }
3932 }
3933
3934 ident_stack.push(stream_ident);
3935 }
3936
3937 HydroNode::JoinHalf { .. } => {
3938 let HydroNode::JoinHalf { right, .. } = node else {
3939 unreachable!()
3940 };
3941
3942 assert!(
3943 right.metadata().collection_kind.is_bounded(),
3944 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3945 right.metadata().collection_kind
3946 );
3947
3948 let build_lifetime = if right.metadata().location_id.is_top_level() {
3949 quote!('static)
3950 } else {
3951 quote!('tick)
3952 };
3953
3954 let build_ident = ident_stack.pop().unwrap();
3955 let probe_ident = ident_stack.pop().unwrap();
3956
3957 let stmt_id = next_stmt_id.get_and_increment();
3958 let stream_ident =
3959 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3960
3961 match builders_or_callback {
3962 BuildersOrCallback::Builders(graph_builders) => {
3963 let builder = graph_builders.get_dfir_mut(&out_location);
3964 builder.add_dfir(
3965 parse_quote! {
3966 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3967 #probe_ident -> [probe]#stream_ident;
3968 #build_ident -> [build]#stream_ident;
3969 },
3970 None,
3971 Some(&stmt_id.to_string()),
3972 );
3973 }
3974 BuildersOrCallback::Callback(_, node_callback) => {
3975 node_callback(node, next_stmt_id);
3976 }
3977 }
3978
3979 ident_stack.push(stream_ident);
3980 }
3981
3982 HydroNode::ResolveFutures { .. } => {
3983 let input_ident = ident_stack.pop().unwrap();
3984
3985 let stmt_id = next_stmt_id.get_and_increment();
3986 let futures_ident =
3987 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3988
3989 match builders_or_callback {
3990 BuildersOrCallback::Builders(graph_builders) => {
3991 let builder = graph_builders.get_dfir_mut(&out_location);
3992 builder.add_dfir(
3993 parse_quote! {
3994 #futures_ident = #input_ident -> resolve_futures();
3995 },
3996 None,
3997 Some(&stmt_id.to_string()),
3998 );
3999 }
4000 BuildersOrCallback::Callback(_, node_callback) => {
4001 node_callback(node, next_stmt_id);
4002 }
4003 }
4004
4005 ident_stack.push(futures_ident);
4006 }
4007
4008 HydroNode::ResolveFuturesBlocking { .. } => {
4009 let input_ident = ident_stack.pop().unwrap();
4010
4011 let stmt_id = next_stmt_id.get_and_increment();
4012 let futures_ident =
4013 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4014
4015 match builders_or_callback {
4016 BuildersOrCallback::Builders(graph_builders) => {
4017 let builder = graph_builders.get_dfir_mut(&out_location);
4018 builder.add_dfir(
4019 parse_quote! {
4020 #futures_ident = #input_ident -> resolve_futures_blocking();
4021 },
4022 None,
4023 Some(&stmt_id.to_string()),
4024 );
4025 }
4026 BuildersOrCallback::Callback(_, node_callback) => {
4027 node_callback(node, next_stmt_id);
4028 }
4029 }
4030
4031 ident_stack.push(futures_ident);
4032 }
4033
4034 HydroNode::ResolveFuturesOrdered { .. } => {
4035 let input_ident = ident_stack.pop().unwrap();
4036
4037 let stmt_id = next_stmt_id.get_and_increment();
4038 let futures_ident =
4039 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4040
4041 match builders_or_callback {
4042 BuildersOrCallback::Builders(graph_builders) => {
4043 let builder = graph_builders.get_dfir_mut(&out_location);
4044 builder.add_dfir(
4045 parse_quote! {
4046 #futures_ident = #input_ident -> resolve_futures_ordered();
4047 },
4048 None,
4049 Some(&stmt_id.to_string()),
4050 );
4051 }
4052 BuildersOrCallback::Callback(_, node_callback) => {
4053 node_callback(node, next_stmt_id);
4054 }
4055 }
4056
4057 ident_stack.push(futures_ident);
4058 }
4059
4060 HydroNode::Map { f, .. } => {
4061 let input_ident = ident_stack.pop().unwrap();
4063 let f_tokens = f.emit_tokens(&mut ident_stack);
4064
4065 let stmt_id = next_stmt_id.get_and_increment();
4066 let map_ident =
4067 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4068
4069 match builders_or_callback {
4070 BuildersOrCallback::Builders(graph_builders) => {
4071 let builder = graph_builders.get_dfir_mut(&out_location);
4072 builder.add_dfir(
4073 parse_quote! {
4074 #map_ident = #input_ident -> map(#f_tokens);
4075 },
4076 None,
4077 Some(&stmt_id.to_string()),
4078 );
4079 }
4080 BuildersOrCallback::Callback(_, node_callback) => {
4081 node_callback(node, next_stmt_id);
4082 }
4083 }
4084
4085 ident_stack.push(map_ident);
4086 }
4087
4088 HydroNode::FlatMap { f, .. } => {
4089 let input_ident = ident_stack.pop().unwrap();
4090 let f_tokens = f.emit_tokens(&mut ident_stack);
4091
4092 let stmt_id = next_stmt_id.get_and_increment();
4093 let flat_map_ident =
4094 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4095
4096 match builders_or_callback {
4097 BuildersOrCallback::Builders(graph_builders) => {
4098 let builder = graph_builders.get_dfir_mut(&out_location);
4099 builder.add_dfir(
4100 parse_quote! {
4101 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4102 },
4103 None,
4104 Some(&stmt_id.to_string()),
4105 );
4106 }
4107 BuildersOrCallback::Callback(_, node_callback) => {
4108 node_callback(node, next_stmt_id);
4109 }
4110 }
4111
4112 ident_stack.push(flat_map_ident);
4113 }
4114
4115 HydroNode::FlatMapStreamBlocking { f, .. } => {
4116 let input_ident = ident_stack.pop().unwrap();
4117 let f_tokens = f.emit_tokens(&mut ident_stack);
4118
4119 let stmt_id = next_stmt_id.get_and_increment();
4120 let flat_map_stream_blocking_ident =
4121 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4122
4123 match builders_or_callback {
4124 BuildersOrCallback::Builders(graph_builders) => {
4125 let builder = graph_builders.get_dfir_mut(&out_location);
4126 builder.add_dfir(
4127 parse_quote! {
4128 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4129 },
4130 None,
4131 Some(&stmt_id.to_string()),
4132 );
4133 }
4134 BuildersOrCallback::Callback(_, node_callback) => {
4135 node_callback(node, next_stmt_id);
4136 }
4137 }
4138
4139 ident_stack.push(flat_map_stream_blocking_ident);
4140 }
4141
4142 HydroNode::Filter { f, .. } => {
4143 let input_ident = ident_stack.pop().unwrap();
4144 let f_tokens = f.emit_tokens(&mut ident_stack);
4145
4146 let stmt_id = next_stmt_id.get_and_increment();
4147 let filter_ident =
4148 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4149
4150 match builders_or_callback {
4151 BuildersOrCallback::Builders(graph_builders) => {
4152 let builder = graph_builders.get_dfir_mut(&out_location);
4153 builder.add_dfir(
4154 parse_quote! {
4155 #filter_ident = #input_ident -> filter(#f_tokens);
4156 },
4157 None,
4158 Some(&stmt_id.to_string()),
4159 );
4160 }
4161 BuildersOrCallback::Callback(_, node_callback) => {
4162 node_callback(node, next_stmt_id);
4163 }
4164 }
4165
4166 ident_stack.push(filter_ident);
4167 }
4168
4169 HydroNode::FilterMap { f, .. } => {
4170 let input_ident = ident_stack.pop().unwrap();
4171 let f_tokens = f.emit_tokens(&mut ident_stack);
4172
4173 let stmt_id = next_stmt_id.get_and_increment();
4174 let filter_map_ident =
4175 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4176
4177 match builders_or_callback {
4178 BuildersOrCallback::Builders(graph_builders) => {
4179 let builder = graph_builders.get_dfir_mut(&out_location);
4180 builder.add_dfir(
4181 parse_quote! {
4182 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4183 },
4184 None,
4185 Some(&stmt_id.to_string()),
4186 );
4187 }
4188 BuildersOrCallback::Callback(_, node_callback) => {
4189 node_callback(node, next_stmt_id);
4190 }
4191 }
4192
4193 ident_stack.push(filter_map_ident);
4194 }
4195
4196 HydroNode::Sort { .. } => {
4197 let input_ident = ident_stack.pop().unwrap();
4198
4199 let stmt_id = next_stmt_id.get_and_increment();
4200 let sort_ident =
4201 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4202
4203 match builders_or_callback {
4204 BuildersOrCallback::Builders(graph_builders) => {
4205 let builder = graph_builders.get_dfir_mut(&out_location);
4206 builder.add_dfir(
4207 parse_quote! {
4208 #sort_ident = #input_ident -> sort();
4209 },
4210 None,
4211 Some(&stmt_id.to_string()),
4212 );
4213 }
4214 BuildersOrCallback::Callback(_, node_callback) => {
4215 node_callback(node, next_stmt_id);
4216 }
4217 }
4218
4219 ident_stack.push(sort_ident);
4220 }
4221
4222 HydroNode::DeferTick { .. } => {
4223 let input_ident = ident_stack.pop().unwrap();
4224
4225 let stmt_id = next_stmt_id.get_and_increment();
4226 let defer_tick_ident =
4227 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4228
4229 match builders_or_callback {
4230 BuildersOrCallback::Builders(graph_builders) => {
4231 let builder = graph_builders.get_dfir_mut(&out_location);
4232 builder.add_dfir(
4233 parse_quote! {
4234 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4235 },
4236 None,
4237 Some(&stmt_id.to_string()),
4238 );
4239 }
4240 BuildersOrCallback::Callback(_, node_callback) => {
4241 node_callback(node, next_stmt_id);
4242 }
4243 }
4244
4245 ident_stack.push(defer_tick_ident);
4246 }
4247
4248 HydroNode::Enumerate { input, .. } => {
4249 let input_ident = ident_stack.pop().unwrap();
4250
4251 let stmt_id = next_stmt_id.get_and_increment();
4252 let enumerate_ident =
4253 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4254
4255 match builders_or_callback {
4256 BuildersOrCallback::Builders(graph_builders) => {
4257 let builder = graph_builders.get_dfir_mut(&out_location);
4258 let lifetime = if input.metadata().location_id.is_top_level() {
4259 quote!('static)
4260 } else {
4261 quote!('tick)
4262 };
4263 builder.add_dfir(
4264 parse_quote! {
4265 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4266 },
4267 None,
4268 Some(&stmt_id.to_string()),
4269 );
4270 }
4271 BuildersOrCallback::Callback(_, node_callback) => {
4272 node_callback(node, next_stmt_id);
4273 }
4274 }
4275
4276 ident_stack.push(enumerate_ident);
4277 }
4278
4279 HydroNode::Inspect { f, .. } => {
4280 let input_ident = ident_stack.pop().unwrap();
4281 let f_tokens = f.emit_tokens(&mut ident_stack);
4282
4283 let stmt_id = next_stmt_id.get_and_increment();
4284 let inspect_ident =
4285 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4286
4287 match builders_or_callback {
4288 BuildersOrCallback::Builders(graph_builders) => {
4289 let builder = graph_builders.get_dfir_mut(&out_location);
4290 builder.add_dfir(
4291 parse_quote! {
4292 #inspect_ident = #input_ident -> inspect(#f_tokens);
4293 },
4294 None,
4295 Some(&stmt_id.to_string()),
4296 );
4297 }
4298 BuildersOrCallback::Callback(_, node_callback) => {
4299 node_callback(node, next_stmt_id);
4300 }
4301 }
4302
4303 ident_stack.push(inspect_ident);
4304 }
4305
4306 HydroNode::Unique { input, .. } => {
4307 let input_ident = ident_stack.pop().unwrap();
4308
4309 let stmt_id = next_stmt_id.get_and_increment();
4310 let unique_ident =
4311 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4312
4313 match builders_or_callback {
4314 BuildersOrCallback::Builders(graph_builders) => {
4315 let builder = graph_builders.get_dfir_mut(&out_location);
4316 let lifetime = if input.metadata().location_id.is_top_level() {
4317 quote!('static)
4318 } else {
4319 quote!('tick)
4320 };
4321
4322 builder.add_dfir(
4323 parse_quote! {
4324 #unique_ident = #input_ident -> unique::<#lifetime>();
4325 },
4326 None,
4327 Some(&stmt_id.to_string()),
4328 );
4329 }
4330 BuildersOrCallback::Callback(_, node_callback) => {
4331 node_callback(node, next_stmt_id);
4332 }
4333 }
4334
4335 ident_stack.push(unique_ident);
4336 }
4337
4338 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4339 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4340 if input.metadata().location_id.is_top_level()
4341 && input.metadata().collection_kind.is_bounded()
4342 {
4343 parse_quote!(fold_no_replay)
4344 } else {
4345 parse_quote!(fold)
4346 }
4347 } else if matches!(node, HydroNode::Scan { .. }) {
4348 parse_quote!(scan)
4349 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4350 parse_quote!(scan_async_blocking)
4351 } else if let HydroNode::FoldKeyed { input, .. } = node {
4352 if input.metadata().location_id.is_top_level()
4353 && input.metadata().collection_kind.is_bounded()
4354 {
4355 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4356 } else {
4357 parse_quote!(fold_keyed)
4358 }
4359 } else {
4360 unreachable!()
4361 };
4362
4363 let (HydroNode::Fold { input, .. }
4364 | HydroNode::FoldKeyed { input, .. }
4365 | HydroNode::Scan { input, .. }
4366 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4367 else {
4368 unreachable!()
4369 };
4370
4371 let lifetime = if input.metadata().location_id.is_top_level() {
4372 quote!('static)
4373 } else {
4374 quote!('tick)
4375 };
4376
4377 let input_ident = ident_stack.pop().unwrap();
4378
4379 let (HydroNode::Fold { init, acc, .. }
4380 | HydroNode::FoldKeyed { init, acc, .. }
4381 | HydroNode::Scan { init, acc, .. }
4382 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4383 else {
4384 unreachable!()
4385 };
4386
4387 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4388 let init_tokens = init.emit_tokens(&mut ident_stack);
4389
4390 let stmt_id = next_stmt_id.get_and_increment();
4391 let fold_ident =
4392 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4393
4394 match builders_or_callback {
4395 BuildersOrCallback::Builders(graph_builders) => {
4396 if matches!(node, HydroNode::Fold { .. })
4397 && node.metadata().location_id.is_top_level()
4398 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4399 && graph_builders.singleton_intermediates()
4400 && !node.metadata().collection_kind.is_bounded()
4401 {
4402 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4403 let hooked_input_ident = graph_builders.emit_fold_hook(
4404 &input.metadata().location_id,
4405 &input_ident,
4406 &input.metadata().collection_kind,
4407 &node.metadata().op,
4408 );
4409
4410 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4411 let acc: syn::Expr = parse_quote!({
4412 let mut __inner = #acc_tokens;
4413 move |__state, __batch: Vec<_>| {
4414 if __batch.is_empty() {
4415 return None;
4416 }
4417 for __value in __batch {
4418 __inner(__state, __value);
4419 }
4420 Some(__state.clone())
4421 }
4422 });
4423 (hooked, acc)
4424 } else {
4425 let acc: syn::Expr = parse_quote!({
4426 let mut __inner = #acc_tokens;
4427 move |__state, __value| {
4428 __inner(__state, __value);
4429 Some(__state.clone())
4430 }
4431 });
4432 (&input_ident, acc)
4433 };
4434
4435 let builder = graph_builders.get_dfir_mut(&out_location);
4436 builder.add_dfir(
4437 parse_quote! {
4438 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4439 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4440 #fold_ident = chain();
4441 },
4442 None,
4443 Some(&stmt_id.to_string()),
4444 );
4445
4446 if hooked_input_ident.is_some() {
4447 fold_hooked_idents.insert(fold_ident.to_string());
4448 }
4449 } else if matches!(node, HydroNode::FoldKeyed { .. })
4450 && node.metadata().location_id.is_top_level()
4451 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4452 && graph_builders.singleton_intermediates()
4453 && !node.metadata().collection_kind.is_bounded()
4454 {
4455 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4456 let hooked_input_ident = graph_builders.emit_fold_hook(
4457 &input.metadata().location_id,
4458 &input_ident,
4459 &input.metadata().collection_kind,
4460 &node.metadata().op,
4461 );
4462 let builder = graph_builders.get_dfir_mut(&out_location);
4463
4464 let wrapped_acc: syn::Expr = parse_quote!({
4465 let mut __init = #init_tokens;
4466 let mut __inner = #acc_tokens;
4467 move |__state, __kv: (_, _)| {
4468 let __state = __state
4470 .entry(::std::clone::Clone::clone(&__kv.0))
4471 .or_insert_with(|| (__init)());
4472 __inner(__state, __kv.1);
4473 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4474 }
4475 });
4476
4477 if let Some(hooked_input_ident) = hooked_input_ident {
4478 builder.add_dfir(
4479 parse_quote! {
4480 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4481 },
4482 None,
4483 Some(&stmt_id.to_string()),
4484 );
4485
4486 fold_hooked_idents.insert(fold_ident.to_string());
4487 } else {
4488 builder.add_dfir(
4489 parse_quote! {
4490 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4491 },
4492 None,
4493 Some(&stmt_id.to_string()),
4494 );
4495 }
4496 } else if (matches!(node, HydroNode::Fold { .. })
4497 || matches!(node, HydroNode::FoldKeyed { .. }))
4498 && !node.metadata().location_id.is_top_level()
4499 && graph_builders.singleton_intermediates()
4500 {
4501 let input_ref = match &*node {
4502 HydroNode::Fold { input, .. } => input,
4503 HydroNode::FoldKeyed { input, .. } => input,
4504 _ => unreachable!(),
4505 };
4506 let hooked_input_ident = graph_builders.emit_fold_hook(
4507 &input_ref.metadata().location_id,
4508 &input_ident,
4509 &input_ref.metadata().collection_kind,
4510 &node.metadata().op,
4511 );
4512
4513 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4514 let builder = graph_builders.get_dfir_mut(&out_location);
4515 builder.add_dfir(
4516 parse_quote! {
4517 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4518 },
4519 None,
4520 Some(&stmt_id.to_string()),
4521 );
4522 } else {
4523 let builder = graph_builders.get_dfir_mut(&out_location);
4524 builder.add_dfir(
4525 parse_quote! {
4526 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4527 },
4528 None,
4529 Some(&stmt_id.to_string()),
4530 );
4531 }
4532 }
4533 BuildersOrCallback::Callback(_, node_callback) => {
4534 node_callback(node, next_stmt_id);
4535 }
4536 }
4537
4538 ident_stack.push(fold_ident);
4539 }
4540
4541 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4542 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4543 if input.metadata().location_id.is_top_level()
4544 && input.metadata().collection_kind.is_bounded()
4545 {
4546 parse_quote!(reduce_no_replay)
4547 } else {
4548 parse_quote!(reduce)
4549 }
4550 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4551 if input.metadata().location_id.is_top_level()
4552 && input.metadata().collection_kind.is_bounded()
4553 {
4554 todo!(
4555 "Calling keyed reduce on a top-level bounded collection is not supported"
4556 )
4557 } else {
4558 parse_quote!(reduce_keyed)
4559 }
4560 } else {
4561 unreachable!()
4562 };
4563
4564 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4565 else {
4566 unreachable!()
4567 };
4568
4569 let lifetime = if input.metadata().location_id.is_top_level() {
4570 quote!('static)
4571 } else {
4572 quote!('tick)
4573 };
4574
4575 let input_ident = ident_stack.pop().unwrap();
4576
4577 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4578 else {
4579 unreachable!()
4580 };
4581
4582 let f_tokens = f.emit_tokens(&mut ident_stack);
4583
4584 let stmt_id = next_stmt_id.get_and_increment();
4585 let reduce_ident =
4586 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4587
4588 match builders_or_callback {
4589 BuildersOrCallback::Builders(graph_builders) => {
4590 if matches!(node, HydroNode::Reduce { .. })
4591 && node.metadata().location_id.is_top_level()
4592 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4593 && graph_builders.singleton_intermediates()
4594 && !node.metadata().collection_kind.is_bounded()
4595 {
4596 todo!(
4597 "Reduce with optional intermediates is not yet supported in simulator"
4598 );
4599 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4600 && node.metadata().location_id.is_top_level()
4601 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4602 && graph_builders.singleton_intermediates()
4603 && !node.metadata().collection_kind.is_bounded()
4604 {
4605 todo!(
4606 "Reduce keyed with optional intermediates is not yet supported in simulator"
4607 );
4608 } else {
4609 let builder = graph_builders.get_dfir_mut(&out_location);
4610 builder.add_dfir(
4611 parse_quote! {
4612 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4613 },
4614 None,
4615 Some(&stmt_id.to_string()),
4616 );
4617 }
4618 }
4619 BuildersOrCallback::Callback(_, node_callback) => {
4620 node_callback(node, next_stmt_id);
4621 }
4622 }
4623
4624 ident_stack.push(reduce_ident);
4625 }
4626
4627 HydroNode::ReduceKeyedWatermark {
4628 f,
4629 input,
4630 metadata,
4631 ..
4632 } => {
4633 let lifetime = if input.metadata().location_id.is_top_level() {
4634 quote!('static)
4635 } else {
4636 quote!('tick)
4637 };
4638
4639 let watermark_ident = ident_stack.pop().unwrap();
4641 let input_ident = ident_stack.pop().unwrap();
4642 let f_tokens = f.emit_tokens(&mut ident_stack);
4643
4644 let stmt_id = next_stmt_id.get_and_increment();
4645 let chain_ident = syn::Ident::new(
4646 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4647 Span::call_site(),
4648 );
4649
4650 let fold_ident =
4651 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4652
4653 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4654 && input.metadata().collection_kind.is_bounded()
4655 {
4656 parse_quote!(fold_no_replay)
4657 } else {
4658 parse_quote!(fold)
4659 };
4660
4661 match builders_or_callback {
4662 BuildersOrCallback::Builders(graph_builders) => {
4663 if metadata.location_id.is_top_level()
4664 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4665 && graph_builders.singleton_intermediates()
4666 && !metadata.collection_kind.is_bounded()
4667 {
4668 todo!(
4669 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4670 )
4671 } else {
4672 let builder = graph_builders.get_dfir_mut(&out_location);
4673 builder.add_dfir(
4674 parse_quote! {
4675 #chain_ident = chain();
4676 #input_ident
4677 -> map(|x| (Some(x), None))
4678 -> [0]#chain_ident;
4679 #watermark_ident
4680 -> map(|watermark| (None, Some(watermark)))
4681 -> [1]#chain_ident;
4682
4683 #fold_ident = #chain_ident
4684 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4685 let __reduce_keyed_fn = #f_tokens;
4686 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4687 if let Some((k, v)) = opt_payload {
4688 if let Some(curr_watermark) = *opt_curr_watermark {
4689 if k < curr_watermark {
4690 return;
4691 }
4692 }
4693 match map.entry(k) {
4694 ::std::collections::hash_map::Entry::Vacant(e) => {
4695 e.insert(v);
4696 }
4697 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4698 __reduce_keyed_fn(e.get_mut(), v);
4699 }
4700 }
4701 } else {
4702 let watermark = opt_watermark.unwrap();
4703 if let Some(curr_watermark) = *opt_curr_watermark {
4704 if watermark <= curr_watermark {
4705 return;
4706 }
4707 }
4708 map.retain(|k, _| *k >= watermark);
4709 *opt_curr_watermark = Some(watermark);
4710 }
4711 }
4712 })
4713 -> flat_map(|(map, _curr_watermark)| map);
4714 },
4715 None,
4716 Some(&stmt_id.to_string()),
4717 );
4718 }
4719 }
4720 BuildersOrCallback::Callback(_, node_callback) => {
4721 node_callback(node, next_stmt_id);
4722 }
4723 }
4724
4725 ident_stack.push(fold_ident);
4726 }
4727
4728 HydroNode::Network {
4729 networking_info,
4730 serialize_fn: serialize_pipeline,
4731 instantiate_fn,
4732 deserialize_fn: deserialize_pipeline,
4733 input,
4734 ..
4735 } => {
4736 let input_ident = ident_stack.pop().unwrap();
4737
4738 let stmt_id = next_stmt_id.get_and_increment();
4739 let receiver_stream_ident =
4740 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4741
4742 match builders_or_callback {
4743 BuildersOrCallback::Builders(graph_builders) => {
4744 let (sink_expr, source_expr) = match instantiate_fn {
4745 DebugInstantiate::Building => (
4746 syn::parse_quote!(DUMMY_SINK),
4747 syn::parse_quote!(DUMMY_SOURCE),
4748 ),
4749
4750 DebugInstantiate::Finalized(finalized) => {
4751 (finalized.sink.clone(), finalized.source.clone())
4752 }
4753 };
4754
4755 graph_builders.create_network(
4756 &input.metadata().location_id,
4757 &out_location,
4758 input_ident,
4759 &receiver_stream_ident,
4760 serialize_pipeline.as_ref(),
4761 sink_expr,
4762 source_expr,
4763 deserialize_pipeline.as_ref(),
4764 stmt_id,
4765 networking_info,
4766 );
4767 }
4768 BuildersOrCallback::Callback(_, node_callback) => {
4769 node_callback(node, next_stmt_id);
4770 }
4771 }
4772
4773 ident_stack.push(receiver_stream_ident);
4774 }
4775
4776 HydroNode::ExternalInput {
4777 instantiate_fn,
4778 deserialize_fn: deserialize_pipeline,
4779 ..
4780 } => {
4781 let stmt_id = next_stmt_id.get_and_increment();
4782 let receiver_stream_ident =
4783 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4784
4785 match builders_or_callback {
4786 BuildersOrCallback::Builders(graph_builders) => {
4787 let (_, source_expr) = match instantiate_fn {
4788 DebugInstantiate::Building => (
4789 syn::parse_quote!(DUMMY_SINK),
4790 syn::parse_quote!(DUMMY_SOURCE),
4791 ),
4792
4793 DebugInstantiate::Finalized(finalized) => {
4794 (finalized.sink.clone(), finalized.source.clone())
4795 }
4796 };
4797
4798 graph_builders.create_external_source(
4799 &out_location,
4800 source_expr,
4801 &receiver_stream_ident,
4802 deserialize_pipeline.as_ref(),
4803 stmt_id,
4804 );
4805 }
4806 BuildersOrCallback::Callback(_, node_callback) => {
4807 node_callback(node, next_stmt_id);
4808 }
4809 }
4810
4811 ident_stack.push(receiver_stream_ident);
4812 }
4813
4814 HydroNode::Counter {
4815 tag,
4816 duration,
4817 prefix,
4818 ..
4819 } => {
4820 let input_ident = ident_stack.pop().unwrap();
4821
4822 let stmt_id = next_stmt_id.get_and_increment();
4823 let counter_ident =
4824 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4825
4826 match builders_or_callback {
4827 BuildersOrCallback::Builders(graph_builders) => {
4828 let arg = format!("{}({})", prefix, tag);
4829 let builder = graph_builders.get_dfir_mut(&out_location);
4830 builder.add_dfir(
4831 parse_quote! {
4832 #counter_ident = #input_ident -> _counter(#arg, #duration);
4833 },
4834 None,
4835 Some(&stmt_id.to_string()),
4836 );
4837 }
4838 BuildersOrCallback::Callback(_, node_callback) => {
4839 node_callback(node, next_stmt_id);
4840 }
4841 }
4842
4843 ident_stack.push(counter_ident);
4844 }
4845 }
4846 },
4847 seen_tees,
4848 false,
4849 );
4850
4851 let ret = ident_stack
4852 .pop()
4853 .expect("ident_stack should have exactly one element after traversal");
4854 assert!(
4855 ident_stack.is_empty(),
4856 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4857 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4858 ident_stack.len()
4859 );
4860 ret
4861 }
4862
4863 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4864 match self {
4865 HydroNode::Placeholder => {
4866 panic!()
4867 }
4868 HydroNode::Cast { .. }
4869 | HydroNode::ObserveNonDet { .. }
4870 | HydroNode::UnboundSingleton { .. }
4871 | HydroNode::AssertIsConsistent { .. } => {}
4872 HydroNode::Source { source, .. } => match source {
4873 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4874 HydroSource::ExternalNetwork()
4875 | HydroSource::Spin()
4876 | HydroSource::ClusterMembers(_, _)
4877 | HydroSource::Embedded(_)
4878 | HydroSource::EmbeddedSingleton(_) => {} },
4880 HydroNode::SingletonSource { value, .. } => {
4881 transform(value);
4882 }
4883 HydroNode::CycleSource { .. }
4884 | HydroNode::Tee { .. }
4885 | HydroNode::Reference { .. }
4886 | HydroNode::YieldConcat { .. }
4887 | HydroNode::BeginAtomic { .. }
4888 | HydroNode::EndAtomic { .. }
4889 | HydroNode::Batch { .. }
4890 | HydroNode::Chain { .. }
4891 | HydroNode::MergeOrdered { .. }
4892 | HydroNode::ChainFirst { .. }
4893 | HydroNode::CrossProduct { .. }
4894 | HydroNode::CrossSingleton { .. }
4895 | HydroNode::ResolveFutures { .. }
4896 | HydroNode::ResolveFuturesBlocking { .. }
4897 | HydroNode::ResolveFuturesOrdered { .. }
4898 | HydroNode::Join { .. }
4899 | HydroNode::JoinHalf { .. }
4900 | HydroNode::Difference { .. }
4901 | HydroNode::AntiJoin { .. }
4902 | HydroNode::DeferTick { .. }
4903 | HydroNode::Enumerate { .. }
4904 | HydroNode::Unique { .. }
4905 | HydroNode::Sort { .. } => {}
4906 HydroNode::Map { f, .. }
4907 | HydroNode::FlatMap { f, .. }
4908 | HydroNode::FlatMapStreamBlocking { f, .. }
4909 | HydroNode::Filter { f, .. }
4910 | HydroNode::FilterMap { f, .. }
4911 | HydroNode::Inspect { f, .. }
4912 | HydroNode::Partition { f, .. }
4913 | HydroNode::Reduce { f, .. }
4914 | HydroNode::ReduceKeyed { f, .. }
4915 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4916 transform(&mut f.expr);
4917 }
4918 HydroNode::Fold { init, acc, .. }
4919 | HydroNode::Scan { init, acc, .. }
4920 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4921 | HydroNode::FoldKeyed { init, acc, .. } => {
4922 transform(&mut init.expr);
4923 transform(&mut acc.expr);
4924 }
4925 HydroNode::Network {
4926 serialize_fn,
4927 deserialize_fn,
4928 ..
4929 } => {
4930 if let Some(serialize_fn) = serialize_fn {
4931 transform(serialize_fn);
4932 }
4933 if let Some(deserialize_fn) = deserialize_fn {
4934 transform(deserialize_fn);
4935 }
4936 }
4937 HydroNode::ExternalInput { deserialize_fn, .. } => {
4938 if let Some(deserialize_fn) = deserialize_fn {
4939 transform(deserialize_fn);
4940 }
4941 }
4942 HydroNode::Counter { duration, .. } => {
4943 transform(duration);
4944 }
4945 }
4946 }
4947
4948 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4949 &self.metadata().op
4950 }
4951
4952 pub fn metadata(&self) -> &HydroIrMetadata {
4953 match self {
4954 HydroNode::Placeholder => {
4955 panic!()
4956 }
4957 HydroNode::Cast { metadata, .. }
4958 | HydroNode::ObserveNonDet { metadata, .. }
4959 | HydroNode::AssertIsConsistent { metadata, .. }
4960 | HydroNode::UnboundSingleton { metadata, .. }
4961 | HydroNode::Source { metadata, .. }
4962 | HydroNode::SingletonSource { metadata, .. }
4963 | HydroNode::CycleSource { metadata, .. }
4964 | HydroNode::Tee { metadata, .. }
4965 | HydroNode::Reference { metadata, .. }
4966 | HydroNode::Partition { metadata, .. }
4967 | HydroNode::YieldConcat { metadata, .. }
4968 | HydroNode::BeginAtomic { metadata, .. }
4969 | HydroNode::EndAtomic { metadata, .. }
4970 | HydroNode::Batch { metadata, .. }
4971 | HydroNode::Chain { metadata, .. }
4972 | HydroNode::MergeOrdered { metadata, .. }
4973 | HydroNode::ChainFirst { metadata, .. }
4974 | HydroNode::CrossProduct { metadata, .. }
4975 | HydroNode::CrossSingleton { metadata, .. }
4976 | HydroNode::Join { metadata, .. }
4977 | HydroNode::JoinHalf { metadata, .. }
4978 | HydroNode::Difference { metadata, .. }
4979 | HydroNode::AntiJoin { metadata, .. }
4980 | HydroNode::ResolveFutures { metadata, .. }
4981 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4982 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4983 | HydroNode::Map { metadata, .. }
4984 | HydroNode::FlatMap { metadata, .. }
4985 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4986 | HydroNode::Filter { metadata, .. }
4987 | HydroNode::FilterMap { metadata, .. }
4988 | HydroNode::DeferTick { metadata, .. }
4989 | HydroNode::Enumerate { metadata, .. }
4990 | HydroNode::Inspect { metadata, .. }
4991 | HydroNode::Unique { metadata, .. }
4992 | HydroNode::Sort { metadata, .. }
4993 | HydroNode::Scan { metadata, .. }
4994 | HydroNode::ScanAsyncBlocking { metadata, .. }
4995 | HydroNode::Fold { metadata, .. }
4996 | HydroNode::FoldKeyed { metadata, .. }
4997 | HydroNode::Reduce { metadata, .. }
4998 | HydroNode::ReduceKeyed { metadata, .. }
4999 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5000 | HydroNode::ExternalInput { metadata, .. }
5001 | HydroNode::Network { metadata, .. }
5002 | HydroNode::Counter { metadata, .. } => metadata,
5003 }
5004 }
5005
5006 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5007 &mut self.metadata_mut().op
5008 }
5009
5010 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5011 match self {
5012 HydroNode::Placeholder => {
5013 panic!()
5014 }
5015 HydroNode::Cast { metadata, .. }
5016 | HydroNode::ObserveNonDet { metadata, .. }
5017 | HydroNode::AssertIsConsistent { metadata, .. }
5018 | HydroNode::UnboundSingleton { metadata, .. }
5019 | HydroNode::Source { metadata, .. }
5020 | HydroNode::SingletonSource { metadata, .. }
5021 | HydroNode::CycleSource { metadata, .. }
5022 | HydroNode::Tee { metadata, .. }
5023 | HydroNode::Reference { metadata, .. }
5024 | HydroNode::Partition { metadata, .. }
5025 | HydroNode::YieldConcat { metadata, .. }
5026 | HydroNode::BeginAtomic { metadata, .. }
5027 | HydroNode::EndAtomic { metadata, .. }
5028 | HydroNode::Batch { metadata, .. }
5029 | HydroNode::Chain { metadata, .. }
5030 | HydroNode::MergeOrdered { metadata, .. }
5031 | HydroNode::ChainFirst { metadata, .. }
5032 | HydroNode::CrossProduct { metadata, .. }
5033 | HydroNode::CrossSingleton { metadata, .. }
5034 | HydroNode::Join { metadata, .. }
5035 | HydroNode::JoinHalf { metadata, .. }
5036 | HydroNode::Difference { metadata, .. }
5037 | HydroNode::AntiJoin { metadata, .. }
5038 | HydroNode::ResolveFutures { metadata, .. }
5039 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5040 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5041 | HydroNode::Map { metadata, .. }
5042 | HydroNode::FlatMap { metadata, .. }
5043 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5044 | HydroNode::Filter { metadata, .. }
5045 | HydroNode::FilterMap { metadata, .. }
5046 | HydroNode::DeferTick { metadata, .. }
5047 | HydroNode::Enumerate { metadata, .. }
5048 | HydroNode::Inspect { metadata, .. }
5049 | HydroNode::Unique { metadata, .. }
5050 | HydroNode::Sort { metadata, .. }
5051 | HydroNode::Scan { metadata, .. }
5052 | HydroNode::ScanAsyncBlocking { metadata, .. }
5053 | HydroNode::Fold { metadata, .. }
5054 | HydroNode::FoldKeyed { metadata, .. }
5055 | HydroNode::Reduce { metadata, .. }
5056 | HydroNode::ReduceKeyed { metadata, .. }
5057 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5058 | HydroNode::ExternalInput { metadata, .. }
5059 | HydroNode::Network { metadata, .. }
5060 | HydroNode::Counter { metadata, .. } => metadata,
5061 }
5062 }
5063
5064 pub fn input(&self) -> Vec<&HydroNode> {
5065 match self {
5066 HydroNode::Placeholder => {
5067 panic!()
5068 }
5069 HydroNode::Source { .. }
5070 | HydroNode::SingletonSource { .. }
5071 | HydroNode::ExternalInput { .. }
5072 | HydroNode::CycleSource { .. }
5073 | HydroNode::Tee { .. }
5074 | HydroNode::Reference { .. }
5075 | HydroNode::Partition { .. } => {
5076 vec![]
5078 }
5079 HydroNode::Cast { inner, .. }
5080 | HydroNode::ObserveNonDet { inner, .. }
5081 | HydroNode::YieldConcat { inner, .. }
5082 | HydroNode::BeginAtomic { inner, .. }
5083 | HydroNode::EndAtomic { inner, .. }
5084 | HydroNode::Batch { inner, .. }
5085 | HydroNode::UnboundSingleton { inner, .. }
5086 | HydroNode::AssertIsConsistent { inner, .. } => {
5087 vec![inner]
5088 }
5089 HydroNode::Chain { first, second, .. } => {
5090 vec![first, second]
5091 }
5092 HydroNode::MergeOrdered { first, second, .. } => {
5093 vec![first, second]
5094 }
5095 HydroNode::ChainFirst { first, second, .. } => {
5096 vec![first, second]
5097 }
5098 HydroNode::CrossProduct { left, right, .. }
5099 | HydroNode::CrossSingleton { left, right, .. }
5100 | HydroNode::Join { left, right, .. }
5101 | HydroNode::JoinHalf { left, right, .. } => {
5102 vec![left, right]
5103 }
5104 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5105 vec![pos, neg]
5106 }
5107 HydroNode::Map { input, .. }
5108 | HydroNode::FlatMap { input, .. }
5109 | HydroNode::FlatMapStreamBlocking { input, .. }
5110 | HydroNode::Filter { input, .. }
5111 | HydroNode::FilterMap { input, .. }
5112 | HydroNode::Sort { input, .. }
5113 | HydroNode::DeferTick { input, .. }
5114 | HydroNode::Enumerate { input, .. }
5115 | HydroNode::Inspect { input, .. }
5116 | HydroNode::Unique { input, .. }
5117 | HydroNode::Network { input, .. }
5118 | HydroNode::Counter { input, .. }
5119 | HydroNode::ResolveFutures { input, .. }
5120 | HydroNode::ResolveFuturesBlocking { input, .. }
5121 | HydroNode::ResolveFuturesOrdered { input, .. }
5122 | HydroNode::Fold { input, .. }
5123 | HydroNode::FoldKeyed { input, .. }
5124 | HydroNode::Reduce { input, .. }
5125 | HydroNode::ReduceKeyed { input, .. }
5126 | HydroNode::Scan { input, .. }
5127 | HydroNode::ScanAsyncBlocking { input, .. } => {
5128 vec![input]
5129 }
5130 HydroNode::ReduceKeyedWatermark {
5131 input, watermark, ..
5132 } => {
5133 vec![input, watermark]
5134 }
5135 }
5136 }
5137
5138 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5139 self.input()
5140 .iter()
5141 .map(|input_node| input_node.metadata())
5142 .collect()
5143 }
5144
5145 pub fn is_shared_with_others(&self) -> bool {
5149 match self {
5150 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5151 Rc::strong_count(&inner.0) > 1
5152 }
5153 HydroNode::Reference { .. } => false,
5156 _ => false,
5157 }
5158 }
5159
5160 pub fn print_root(&self) -> String {
5161 match self {
5162 HydroNode::Placeholder => {
5163 panic!()
5164 }
5165 HydroNode::Cast { .. } => "Cast()".to_owned(),
5166 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5167 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5168 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5169 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5170 HydroNode::SingletonSource {
5171 value,
5172 first_tick_only,
5173 ..
5174 } => format!(
5175 "SingletonSource({:?}, first_tick_only={})",
5176 value, first_tick_only
5177 ),
5178 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5179 HydroNode::Tee { inner, .. } => {
5180 format!("Tee({})", inner.0.borrow().print_root())
5181 }
5182 HydroNode::Reference { inner, kind, .. } => {
5183 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5184 }
5185 HydroNode::Partition { f, is_true, .. } => {
5186 format!("Partition({:?}, is_true={})", f, is_true)
5187 }
5188 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5189 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5190 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5191 HydroNode::Batch { .. } => "Batch()".to_owned(),
5192 HydroNode::Chain { first, second, .. } => {
5193 format!("Chain({}, {})", first.print_root(), second.print_root())
5194 }
5195 HydroNode::MergeOrdered { first, second, .. } => {
5196 format!(
5197 "MergeOrdered({}, {})",
5198 first.print_root(),
5199 second.print_root()
5200 )
5201 }
5202 HydroNode::ChainFirst { first, second, .. } => {
5203 format!(
5204 "ChainFirst({}, {})",
5205 first.print_root(),
5206 second.print_root()
5207 )
5208 }
5209 HydroNode::CrossProduct { left, right, .. } => {
5210 format!(
5211 "CrossProduct({}, {})",
5212 left.print_root(),
5213 right.print_root()
5214 )
5215 }
5216 HydroNode::CrossSingleton { left, right, .. } => {
5217 format!(
5218 "CrossSingleton({}, {})",
5219 left.print_root(),
5220 right.print_root()
5221 )
5222 }
5223 HydroNode::Join { left, right, .. } => {
5224 format!("Join({}, {})", left.print_root(), right.print_root())
5225 }
5226 HydroNode::JoinHalf { left, right, .. } => {
5227 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5228 }
5229 HydroNode::Difference { pos, neg, .. } => {
5230 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5231 }
5232 HydroNode::AntiJoin { pos, neg, .. } => {
5233 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5234 }
5235 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5236 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5237 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5238 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5239 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5240 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5241 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5242 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5243 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5244 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5245 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5246 HydroNode::Unique { .. } => "Unique()".to_owned(),
5247 HydroNode::Sort { .. } => "Sort()".to_owned(),
5248 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5249 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5250 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5251 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5252 }
5253 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5254 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5255 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5256 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5257 HydroNode::Network { .. } => "Network()".to_owned(),
5258 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5259 HydroNode::Counter { tag, duration, .. } => {
5260 format!("Counter({:?}, {:?})", tag, duration)
5261 }
5262 }
5263 }
5264}
5265
5266#[cfg(feature = "build")]
5267fn instantiate_network<'a, D>(
5268 env: &mut D::InstantiateEnv,
5269 from_location: &LocationId,
5270 to_location: &LocationId,
5271 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5272 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5273 name: Option<&str>,
5274 networking_info: &crate::networking::NetworkingInfo,
5275) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5276where
5277 D: Deploy<'a>,
5278{
5279 let ((sink, source), connect_fn) = match (from_location, to_location) {
5280 (&LocationId::Process(from), &LocationId::Process(to)) => {
5281 let from_node = processes
5282 .get(from)
5283 .unwrap_or_else(|| {
5284 panic!("A process used in the graph was not instantiated: {}", from)
5285 })
5286 .clone();
5287 let to_node = processes
5288 .get(to)
5289 .unwrap_or_else(|| {
5290 panic!("A process used in the graph was not instantiated: {}", to)
5291 })
5292 .clone();
5293
5294 let sink_port = from_node.next_port();
5295 let source_port = to_node.next_port();
5296
5297 (
5298 D::o2o_sink_source(
5299 env,
5300 &from_node,
5301 &sink_port,
5302 &to_node,
5303 &source_port,
5304 name,
5305 networking_info,
5306 ),
5307 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5308 )
5309 }
5310 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5311 let from_node = processes
5312 .get(from)
5313 .unwrap_or_else(|| {
5314 panic!("A process used in the graph was not instantiated: {}", from)
5315 })
5316 .clone();
5317 let to_node = clusters
5318 .get(to)
5319 .unwrap_or_else(|| {
5320 panic!("A cluster used in the graph was not instantiated: {}", to)
5321 })
5322 .clone();
5323
5324 let sink_port = from_node.next_port();
5325 let source_port = to_node.next_port();
5326
5327 (
5328 D::o2m_sink_source(
5329 env,
5330 &from_node,
5331 &sink_port,
5332 &to_node,
5333 &source_port,
5334 name,
5335 networking_info,
5336 ),
5337 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5338 )
5339 }
5340 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5341 let from_node = clusters
5342 .get(from)
5343 .unwrap_or_else(|| {
5344 panic!("A cluster used in the graph was not instantiated: {}", from)
5345 })
5346 .clone();
5347 let to_node = processes
5348 .get(to)
5349 .unwrap_or_else(|| {
5350 panic!("A process used in the graph was not instantiated: {}", to)
5351 })
5352 .clone();
5353
5354 let sink_port = from_node.next_port();
5355 let source_port = to_node.next_port();
5356
5357 (
5358 D::m2o_sink_source(
5359 env,
5360 &from_node,
5361 &sink_port,
5362 &to_node,
5363 &source_port,
5364 name,
5365 networking_info,
5366 ),
5367 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5368 )
5369 }
5370 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5371 let from_node = clusters
5372 .get(from)
5373 .unwrap_or_else(|| {
5374 panic!("A cluster used in the graph was not instantiated: {}", from)
5375 })
5376 .clone();
5377 let to_node = clusters
5378 .get(to)
5379 .unwrap_or_else(|| {
5380 panic!("A cluster used in the graph was not instantiated: {}", to)
5381 })
5382 .clone();
5383
5384 let sink_port = from_node.next_port();
5385 let source_port = to_node.next_port();
5386
5387 (
5388 D::m2m_sink_source(
5389 env,
5390 &from_node,
5391 &sink_port,
5392 &to_node,
5393 &source_port,
5394 name,
5395 networking_info,
5396 ),
5397 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5398 )
5399 }
5400 (LocationId::Tick(_, _), _) => panic!(),
5401 (_, LocationId::Tick(_, _)) => panic!(),
5402 (LocationId::Atomic(_), _) => panic!(),
5403 (_, LocationId::Atomic(_)) => panic!(),
5404 };
5405 (sink, source, connect_fn)
5406}
5407
5408#[cfg(test)]
5409mod serde_test;
5410
5411#[cfg(test)]
5412mod test {
5413 use std::mem::size_of;
5414
5415 use stageleft::{QuotedWithContext, q};
5416
5417 use super::*;
5418
5419 #[test]
5420 #[cfg_attr(
5421 not(feature = "build"),
5422 ignore = "expects inclusion of feature-gated fields"
5423 )]
5424 fn hydro_node_size() {
5425 assert_eq!(size_of::<HydroNode>(), 264);
5426 }
5427
5428 #[test]
5429 #[cfg_attr(
5430 not(feature = "build"),
5431 ignore = "expects inclusion of feature-gated fields"
5432 )]
5433 fn hydro_root_size() {
5434 assert_eq!(size_of::<HydroRoot>(), 136);
5435 }
5436
5437 #[test]
5438 fn test_simplify_q_macro_basic() {
5439 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5441 let result = simplify_q_macro(simple_expr.clone());
5442 assert_eq!(result, simple_expr);
5443 }
5444
5445 #[test]
5446 fn test_simplify_q_macro_actual_stageleft_call() {
5447 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5449 let result = simplify_q_macro(stageleft_call);
5450 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5453 }
5454
5455 #[test]
5456 fn test_closure_no_pipe_at_start() {
5457 let stageleft_call = q!({
5459 let foo = 123;
5460 move |b: usize| b + foo
5461 })
5462 .splice_fn1_ctx(&());
5463 let result = simplify_q_macro(stageleft_call);
5464 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5465 }
5466}