Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn has_mut_ref(&self) -> bool {
158        self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159    }
160
161    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162        Self {
163            expr: self.expr.clone(),
164            singleton_refs: self
165                .singleton_refs
166                .iter()
167                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168                .collect(),
169        }
170    }
171
172    pub fn transform_children(
173        &mut self,
174        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175        seen_tees: &mut SeenSharedNodes,
176    ) {
177        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178            transform(ref_node, seen_tees);
179        }
180    }
181
182    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
183    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
184    #[cfg(feature = "build")]
185    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186        if self.singleton_refs.is_empty() {
187            self.expr.0.to_token_stream()
188        } else {
189            assert!(
190                ident_stack.len() >= self.singleton_refs.len(),
191                "ident_stack has {} entries but expected at least {} for singleton_refs",
192                ident_stack.len(),
193                self.singleton_refs.len()
194            );
195            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197            let mut let_bindings = Vec::new();
198            for ((i, (ref_node, is_mut)), ref_ident) in
199                self.singleton_refs.iter().enumerate().zip(ref_idents)
200            {
201                let HydroNode::Reference { access_counter, .. } = ref_node else {
202                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
203                };
204                let group = access_counter.frozen_group();
205                // TODO(mingwei): proper spanning?
206                let local_ident = handoff_ref_ident(i);
207                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209                let mut_token = is_mut.then(|| quote!(mut));
210                let binding = quote! {
211                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212                };
213                let_bindings.push(binding);
214            }
215
216            let expr = &self.expr.0;
217            quote! {
218                {
219                    #( #let_bindings )*
220                    #expr
221                }
222            }
223        }
224    }
225}
226
227/// Wrapper that displays only the tokens of a parsed expr.
228///
229/// Boxes `syn::Type` which is ~240 bytes.
230#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235        serializer.serialize_str(&self.to_string())
236    }
237}
238
239impl From<syn::Expr> for DebugExpr {
240    fn from(expr: syn::Expr) -> Self {
241        Self(Box::new(expr))
242    }
243}
244
245impl Deref for DebugExpr {
246    type Target = syn::Expr;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252
253impl ToTokens for DebugExpr {
254    fn to_tokens(&self, tokens: &mut TokenStream) {
255        self.0.to_tokens(tokens);
256    }
257}
258
259impl Debug for DebugExpr {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.to_token_stream())
262    }
263}
264
265impl Display for DebugExpr {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let original = self.0.as_ref().clone();
268        let simplified = simplify_q_macro(original);
269
270        // For now, just use quote formatting without trying to parse as a statement
271        // This avoids the syn::parse_quote! issues entirely
272        write!(f, "q!({})", quote::quote!(#simplified))
273    }
274}
275
276/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
277fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279        // Look for calls to stageleft::runtime_support::fn*
280        && is_stageleft_runtime_support_call(&path_expr.path)
281        && let syn::Expr::Block(b) = &call.args[0]
282        && b.block.stmts.len() == 3
283        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284    // skip the first two, which are imports
285    {
286        let mut e = e.clone();
287        while let syn::Expr::Block(ref mut block) = e
288            && block.block.stmts.len() == 1
289            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290        {
291            e = inner_e;
292        }
293
294        e
295    } else {
296        expr
297    }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301    // Check if this is a call to stageleft::runtime_support::fn*
302    if let Some(last_segment) = path.segments.last() {
303        let fn_name = last_segment.ident.to_string();
304        path.segments.len() > 2
305            && path.segments[0].ident == "stageleft"
306            && path.segments[1].ident == "runtime_support"
307            && fn_name.contains("_type_hint")
308    } else {
309        false
310    }
311}
312
313/// Debug displays the type's tokens.
314///
315/// Boxes `syn::Type` which is ~320 bytes.
316#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320    fn from(t: syn::Type) -> Self {
321        Self(Box::new(t))
322    }
323}
324
325impl Deref for DebugType {
326    type Target = syn::Type;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333impl ToTokens for DebugType {
334    fn to_tokens(&self, tokens: &mut TokenStream) {
335        self.0.to_tokens(tokens);
336    }
337}
338
339impl Debug for DebugType {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.0.to_token_stream())
342    }
343}
344
345impl serde::Serialize for DebugType {
346    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348    }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352    backtrace: &Backtrace,
353    serializer: S,
354) -> Result<S::Ok, S::Error> {
355    match backtrace.format_span() {
356        Some(span) => serializer.serialize_some(&span),
357        None => serializer.serialize_none(),
358    }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362    ident: &syn::Ident,
363    serializer: S,
364) -> Result<S::Ok, S::Error> {
365    serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369    Building,
370    Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375        match self {
376            DebugInstantiate::Building => {
377                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378            }
379            DebugInstantiate::Finalized(_) => {
380                panic!(
381                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382                )
383            }
384        }
385    }
386}
387
388#[cfg_attr(
389    not(feature = "build"),
390    expect(
391        dead_code,
392        reason = "sink, source unused without `feature = \"build\"`."
393    )
394)]
395pub struct DebugInstantiateFinalized {
396    sink: syn::Expr,
397    source: syn::Expr,
398    connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402    fn from(f: DebugInstantiateFinalized) -> Self {
403        Self::Finalized(Box::new(f))
404    }
405}
406
407impl Debug for DebugInstantiate {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "<network instantiate>")
410    }
411}
412
413impl Hash for DebugInstantiate {
414    fn hash<H: Hasher>(&self, _state: &mut H) {
415        // Do nothing
416    }
417}
418
419impl Clone for DebugInstantiate {
420    fn clone(&self) -> Self {
421        match self {
422            DebugInstantiate::Building => DebugInstantiate::Building,
423            DebugInstantiate::Finalized(_) => {
424                panic!("DebugInstantiate::Finalized should not be cloned")
425            }
426        }
427    }
428}
429
430/// Tracks the instantiation state of a `ClusterMembers` source.
431///
432/// During `compile_network`, the first `ClusterMembers` node for a given
433/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
434/// receives the expression returned by `Deploy::cluster_membership_stream`.
435/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
436/// during code-gen they simply reference the tee output of the first node
437/// instead of creating a redundant `source_stream`.
438#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440    /// Not yet instantiated.
441    Uninit,
442    /// The primary instance: holds the stream expression and will emit
443    /// `source_stream(expr) -> tee()` during code-gen.
444    Stream(DebugExpr),
445    /// A secondary instance that references the tee output of the primary.
446    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
447    /// can derive the deterministic tee ident without extra state.
448    Tee(LocationId, LocationId),
449}
450
451/// A source in a Hydro graph, where data enters the graph.
452#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454    Stream(DebugExpr),
455    ExternalNetwork(),
456    Iter(DebugExpr),
457    Spin(),
458    ClusterMembers(LocationId, ClusterMembersState),
459    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
465/// and simulations.
466///
467/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
468/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
469pub trait DfirBuilder {
470    /// Whether the representation of singletons should include intermediate states.
471    fn singleton_intermediates(&self) -> bool;
472
473    /// Gets the DFIR builder for the given location, creating it if necessary.
474    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476    #[expect(clippy::too_many_arguments, reason = "TODO")]
477    fn batch(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        out_location: &LocationId,
484        op_meta: &HydroIrOpMetadata,
485        fold_hooked_idents: &HashSet<String>,
486    );
487    fn yield_from_tick(
488        &mut self,
489        in_ident: syn::Ident,
490        in_location: &LocationId,
491        in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        out_location: &LocationId,
494    );
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        out_location: &LocationId,
503        op_meta: &HydroIrOpMetadata,
504    );
505    fn end_atomic(
506        &mut self,
507        in_ident: syn::Ident,
508        in_location: &LocationId,
509        in_kind: &CollectionKind,
510        out_ident: &syn::Ident,
511    );
512
513    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514    fn observe_nondet(
515        &mut self,
516        trusted: bool,
517        location: &LocationId,
518        in_ident: syn::Ident,
519        in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521        out_kind: &CollectionKind,
522        op_meta: &HydroIrOpMetadata,
523    );
524
525    #[expect(clippy::too_many_arguments, reason = "TODO")]
526    fn merge_ordered(
527        &mut self,
528        location: &LocationId,
529        first_ident: syn::Ident,
530        second_ident: syn::Ident,
531        out_ident: &syn::Ident,
532        in_kind: &CollectionKind,
533        op_meta: &HydroIrOpMetadata,
534        operator_tag: Option<&str>,
535    );
536
537    #[expect(clippy::too_many_arguments, reason = "TODO")]
538    fn create_network(
539        &mut self,
540        from: &LocationId,
541        to: &LocationId,
542        input_ident: syn::Ident,
543        out_ident: &syn::Ident,
544        serialize: Option<&DebugExpr>,
545        sink: syn::Expr,
546        source: syn::Expr,
547        deserialize: Option<&DebugExpr>,
548        tag_id: StmtId,
549        networking_info: &crate::networking::NetworkingInfo,
550    );
551
552    fn create_external_source(
553        &mut self,
554        on: &LocationId,
555        source_expr: syn::Expr,
556        out_ident: &syn::Ident,
557        deserialize: Option<&DebugExpr>,
558        tag_id: StmtId,
559    );
560
561    fn create_external_output(
562        &mut self,
563        on: &LocationId,
564        sink_expr: syn::Expr,
565        input_ident: &syn::Ident,
566        serialize: Option<&DebugExpr>,
567        tag_id: StmtId,
568    );
569
570    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
571    /// Returns the new input ident to use for the fold if a hook was emitted.
572    fn emit_fold_hook(
573        &mut self,
574        location: &LocationId,
575        in_ident: &syn::Ident,
576        in_kind: &CollectionKind,
577        op_meta: &HydroIrOpMetadata,
578    ) -> Option<syn::Ident>;
579
580    /// Inserts necessary code to validate a manual assertion that at this point the
581    /// input live collection is consistent. In production, this is a no-op, but in simulation
582    /// this will (not yet implemented) inject assertions that validate consistency.
583    fn assert_is_consistent(
584        &mut self,
585        trusted: bool,
586        location: &LocationId,
587        in_ident: syn::Ident,
588        out_ident: &syn::Ident,
589    );
590
591    /// Observes non-determinism introduced by a mut closure operating on a non-strict
592    /// (unordered / at-least-once) input. In production this is identity; in simulation
593    /// it delegates to `observe_nondet` with the strict output kind.
594    fn observe_for_mut(
595        &mut self,
596        location: &LocationId,
597        in_ident: syn::Ident,
598        in_kind: &CollectionKind,
599        out_ident: &syn::Ident,
600        op_meta: &HydroIrOpMetadata,
601    );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606    fn singleton_intermediates(&self) -> bool {
607        false
608    }
609
610    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611        self.entry(location.root().key())
612            .expect("location was removed")
613            .or_default()
614    }
615
616    fn batch(
617        &mut self,
618        in_ident: syn::Ident,
619        in_location: &LocationId,
620        in_kind: &CollectionKind,
621        out_ident: &syn::Ident,
622        _out_location: &LocationId,
623        _op_meta: &HydroIrOpMetadata,
624        _fold_hooked_idents: &HashSet<String>,
625    ) {
626        let builder = self.get_dfir_mut(in_location.root());
627        if in_kind.is_bounded()
628            && matches!(
629                in_kind,
630                CollectionKind::Singleton { .. }
631                    | CollectionKind::Optional { .. }
632                    | CollectionKind::KeyedSingleton { .. }
633            )
634        {
635            assert!(in_location.is_top_level());
636            builder.add_dfir(
637                parse_quote! {
638                    #out_ident = #in_ident -> persist::<'static>();
639                },
640                None,
641                None,
642            );
643        } else {
644            builder.add_dfir(
645                parse_quote! {
646                    #out_ident = #in_ident;
647                },
648                None,
649                None,
650            );
651        }
652    }
653
654    fn yield_from_tick(
655        &mut self,
656        in_ident: syn::Ident,
657        in_location: &LocationId,
658        _in_kind: &CollectionKind,
659        out_ident: &syn::Ident,
660        _out_location: &LocationId,
661    ) {
662        let builder = self.get_dfir_mut(in_location.root());
663        builder.add_dfir(
664            parse_quote! {
665                #out_ident = #in_ident;
666            },
667            None,
668            None,
669        );
670    }
671
672    fn begin_atomic(
673        &mut self,
674        in_ident: syn::Ident,
675        in_location: &LocationId,
676        _in_kind: &CollectionKind,
677        out_ident: &syn::Ident,
678        _out_location: &LocationId,
679        _op_meta: &HydroIrOpMetadata,
680    ) {
681        let builder = self.get_dfir_mut(in_location.root());
682        builder.add_dfir(
683            parse_quote! {
684                #out_ident = #in_ident;
685            },
686            None,
687            None,
688        );
689    }
690
691    fn end_atomic(
692        &mut self,
693        in_ident: syn::Ident,
694        in_location: &LocationId,
695        _in_kind: &CollectionKind,
696        out_ident: &syn::Ident,
697    ) {
698        let builder = self.get_dfir_mut(in_location.root());
699        builder.add_dfir(
700            parse_quote! {
701                #out_ident = #in_ident;
702            },
703            None,
704            None,
705        );
706    }
707
708    fn observe_nondet(
709        &mut self,
710        _trusted: bool,
711        location: &LocationId,
712        in_ident: syn::Ident,
713        _in_kind: &CollectionKind,
714        out_ident: &syn::Ident,
715        _out_kind: &CollectionKind,
716        _op_meta: &HydroIrOpMetadata,
717    ) {
718        let builder = self.get_dfir_mut(location);
719        builder.add_dfir(
720            parse_quote! {
721                #out_ident = #in_ident;
722            },
723            None,
724            None,
725        );
726    }
727
728    fn merge_ordered(
729        &mut self,
730        location: &LocationId,
731        first_ident: syn::Ident,
732        second_ident: syn::Ident,
733        out_ident: &syn::Ident,
734        _in_kind: &CollectionKind,
735        _op_meta: &HydroIrOpMetadata,
736        operator_tag: Option<&str>,
737    ) {
738        let builder = self.get_dfir_mut(location);
739        builder.add_dfir(
740            parse_quote! {
741                #out_ident = union();
742                #first_ident -> [0]#out_ident;
743                #second_ident -> [1]#out_ident;
744            },
745            None,
746            operator_tag,
747        );
748    }
749
750    fn create_network(
751        &mut self,
752        from: &LocationId,
753        to: &LocationId,
754        input_ident: syn::Ident,
755        out_ident: &syn::Ident,
756        serialize: Option<&DebugExpr>,
757        sink: syn::Expr,
758        source: syn::Expr,
759        deserialize: Option<&DebugExpr>,
760        tag_id: StmtId,
761        _networking_info: &crate::networking::NetworkingInfo,
762    ) {
763        let sender_builder = self.get_dfir_mut(from);
764        if let Some(serialize_pipeline) = serialize {
765            sender_builder.add_dfir(
766                parse_quote! {
767                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768                },
769                None,
770                // operator tag separates send and receive, which otherwise have the same next_stmt_id
771                Some(&format!("send{}", tag_id)),
772            );
773        } else {
774            sender_builder.add_dfir(
775                parse_quote! {
776                    #input_ident -> dest_sink(#sink);
777                },
778                None,
779                Some(&format!("send{}", tag_id)),
780            );
781        }
782
783        let receiver_builder = self.get_dfir_mut(to);
784        if let Some(deserialize_pipeline) = deserialize {
785            receiver_builder.add_dfir(
786                parse_quote! {
787                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788                },
789                None,
790                Some(&format!("recv{}", tag_id)),
791            );
792        } else {
793            receiver_builder.add_dfir(
794                parse_quote! {
795                    #out_ident = source_stream(#source);
796                },
797                None,
798                Some(&format!("recv{}", tag_id)),
799            );
800        }
801    }
802
803    fn create_external_source(
804        &mut self,
805        on: &LocationId,
806        source_expr: syn::Expr,
807        out_ident: &syn::Ident,
808        deserialize: Option<&DebugExpr>,
809        tag_id: StmtId,
810    ) {
811        let receiver_builder = self.get_dfir_mut(on);
812        if let Some(deserialize_pipeline) = deserialize {
813            receiver_builder.add_dfir(
814                parse_quote! {
815                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816                },
817                None,
818                Some(&format!("recv{}", tag_id)),
819            );
820        } else {
821            receiver_builder.add_dfir(
822                parse_quote! {
823                    #out_ident = source_stream(#source_expr);
824                },
825                None,
826                Some(&format!("recv{}", tag_id)),
827            );
828        }
829    }
830
831    fn create_external_output(
832        &mut self,
833        on: &LocationId,
834        sink_expr: syn::Expr,
835        input_ident: &syn::Ident,
836        serialize: Option<&DebugExpr>,
837        tag_id: StmtId,
838    ) {
839        let sender_builder = self.get_dfir_mut(on);
840        if let Some(serialize_fn) = serialize {
841            sender_builder.add_dfir(
842                parse_quote! {
843                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844                },
845                None,
846                // operator tag separates send and receive, which otherwise have the same next_stmt_id
847                Some(&format!("send{}", tag_id)),
848            );
849        } else {
850            sender_builder.add_dfir(
851                parse_quote! {
852                    #input_ident -> dest_sink(#sink_expr);
853                },
854                None,
855                Some(&format!("send{}", tag_id)),
856            );
857        }
858    }
859
860    fn emit_fold_hook(
861        &mut self,
862        _location: &LocationId,
863        _in_ident: &syn::Ident,
864        _in_kind: &CollectionKind,
865        _op_meta: &HydroIrOpMetadata,
866    ) -> Option<syn::Ident> {
867        None
868    }
869
870    fn assert_is_consistent(
871        &mut self,
872        _trusted: bool,
873        location: &LocationId,
874        in_ident: syn::Ident,
875        out_ident: &syn::Ident,
876    ) {
877        let builder = self.get_dfir_mut(location);
878        builder.add_dfir(
879            parse_quote! {
880                #out_ident = #in_ident;
881            },
882            None,
883            None,
884        );
885    }
886
887    fn observe_for_mut(
888        &mut self,
889        location: &LocationId,
890        in_ident: syn::Ident,
891        _in_kind: &CollectionKind,
892        out_ident: &syn::Ident,
893        _op_meta: &HydroIrOpMetadata,
894    ) {
895        let builder = self.get_dfir_mut(location);
896        builder.add_dfir(
897            parse_quote! {
898                #out_ident = #in_ident;
899            },
900            None,
901            None,
902        );
903    }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912    Builders(&'a mut dyn DfirBuilder),
913    Callback(L, N),
914}
915
916/// An root in a Hydro graph, which is an pipeline that doesn't emit
917/// any downstream values. Traversals over the dataflow graph and
918/// generating DFIR IR start from roots.
919#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921    ForEach {
922        f: ClosureExpr,
923        input: Box<HydroNode>,
924        op_metadata: HydroIrOpMetadata,
925    },
926    SendExternal {
927        to_external_key: LocationKey,
928        to_port_id: ExternalPortId,
929        to_many: bool,
930        unpaired: bool,
931        serialize_fn: Option<DebugExpr>,
932        instantiate_fn: DebugInstantiate,
933        input: Box<HydroNode>,
934        op_metadata: HydroIrOpMetadata,
935    },
936    DestSink {
937        sink: DebugExpr,
938        input: Box<HydroNode>,
939        op_metadata: HydroIrOpMetadata,
940    },
941    CycleSink {
942        cycle_id: CycleId,
943        input: Box<HydroNode>,
944        op_metadata: HydroIrOpMetadata,
945    },
946    EmbeddedOutput {
947        #[serde(serialize_with = "serialize_ident")]
948        ident: syn::Ident,
949        input: Box<HydroNode>,
950        op_metadata: HydroIrOpMetadata,
951    },
952    Null {
953        input: Box<HydroNode>,
954        op_metadata: HydroIrOpMetadata,
955    },
956}
957
958impl HydroRoot {
959    #[cfg(feature = "build")]
960    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961    pub fn compile_network<'a, D>(
962        &mut self,
963        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964        seen_tees: &mut SeenSharedNodes,
965        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966        processes: &SparseSecondaryMap<LocationKey, D::Process>,
967        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968        externals: &SparseSecondaryMap<LocationKey, D::External>,
969        env: &mut D::InstantiateEnv,
970    ) where
971        D: Deploy<'a>,
972    {
973        let refcell_extra_stmts = RefCell::new(extra_stmts);
974        let refcell_env = RefCell::new(env);
975        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976        self.transform_bottom_up(
977            &mut |l| {
978                if let HydroRoot::SendExternal {
979                    input,
980                    to_external_key,
981                    to_port_id,
982                    to_many,
983                    unpaired,
984                    instantiate_fn,
985                    ..
986                } = l
987                {
988                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
989                        DebugInstantiate::Building => {
990                            let to_node = externals
991                                .get(*to_external_key)
992                                .unwrap_or_else(|| {
993                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
994                                })
995                                .clone();
996
997                            match input.metadata().location_id.root() {
998                                &LocationId::Process(process_key) => {
999                                    if *to_many {
1000                                        (
1001                                            (
1002                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1003                                                parse_quote!(DUMMY),
1004                                            ),
1005                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1006                                        )
1007                                    } else {
1008                                        let from_node = processes
1009                                            .get(process_key)
1010                                            .unwrap_or_else(|| {
1011                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1012                                            })
1013                                            .clone();
1014
1015                                        let sink_port = from_node.next_port();
1016                                        let source_port = to_node.next_port();
1017
1018                                        if *unpaired {
1019                                            use stageleft::quote_type;
1020                                            use tokio_util::codec::LengthDelimitedCodec;
1021
1022                                            to_node.register(*to_port_id, source_port.clone());
1023
1024                                            let _ = D::e2o_source(
1025                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1026                                                &to_node, &source_port,
1027                                                &from_node, &sink_port,
1028                                                &quote_type::<LengthDelimitedCodec>(),
1029                                                format!("{}_{}", *to_external_key, *to_port_id)
1030                                            );
1031                                        }
1032
1033                                        (
1034                                            (
1035                                                D::o2e_sink(
1036                                                    &from_node,
1037                                                    &sink_port,
1038                                                    &to_node,
1039                                                    &source_port,
1040                                                    format!("{}_{}", *to_external_key, *to_port_id)
1041                                                ),
1042                                                parse_quote!(DUMMY),
1043                                            ),
1044                                            if *unpaired {
1045                                                D::e2o_connect(
1046                                                    &to_node,
1047                                                    &source_port,
1048                                                    &from_node,
1049                                                    &sink_port,
1050                                                    *to_many,
1051                                                    NetworkHint::Auto,
1052                                                )
1053                                            } else {
1054                                                Box::new(|| {}) as Box<dyn FnOnce()>
1055                                            },
1056                                        )
1057                                    }
1058                                }
1059                                LocationId::Cluster(cluster_key) => {
1060                                    let from_node = clusters
1061                                        .get(*cluster_key)
1062                                        .unwrap_or_else(|| {
1063                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1064                                        })
1065                                        .clone();
1066
1067                                    let sink_port = from_node.next_port();
1068                                    let source_port = to_node.next_port();
1069
1070                                    if *unpaired {
1071                                        to_node.register(*to_port_id, source_port.clone());
1072                                    }
1073
1074                                    (
1075                                        (
1076                                            D::m2e_sink(
1077                                                &from_node,
1078                                                &sink_port,
1079                                                &to_node,
1080                                                &source_port,
1081                                                format!("{}_{}", *to_external_key, *to_port_id)
1082                                            ),
1083                                            parse_quote!(DUMMY),
1084                                        ),
1085                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1086                                    )
1087                                }
1088                                _ => panic!()
1089                            }
1090                        },
1091
1092                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1093                    };
1094
1095                    *instantiate_fn = DebugInstantiateFinalized {
1096                        sink: sink_expr,
1097                        source: source_expr,
1098                        connect_fn: Some(connect_fn),
1099                    }
1100                    .into();
1101                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1102                    let element_type = match &input.metadata().collection_kind {
1103                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1104                        _ => panic!("Embedded output must have Stream collection kind"),
1105                    };
1106                    let location_key = match input.metadata().location_id.root() {
1107                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1108                        _ => panic!("Embedded output must be on a process or cluster"),
1109                    };
1110                    D::register_embedded_output(
1111                        &mut refcell_env.borrow_mut(),
1112                        location_key,
1113                        ident,
1114                        &element_type,
1115                    );
1116                }
1117            },
1118            &mut |n| {
1119                if let HydroNode::Network {
1120                    name,
1121                    networking_info,
1122                    input,
1123                    instantiate_fn,
1124                    metadata,
1125                    ..
1126                } = n
1127                {
1128                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1129                        DebugInstantiate::Building => instantiate_network::<D>(
1130                            &mut refcell_env.borrow_mut(),
1131                            input.metadata().location_id.root(),
1132                            metadata.location_id.root(),
1133                            processes,
1134                            clusters,
1135                            name.as_deref(),
1136                            networking_info,
1137                        ),
1138
1139                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1140                    };
1141
1142                    *instantiate_fn = DebugInstantiateFinalized {
1143                        sink: sink_expr,
1144                        source: source_expr,
1145                        connect_fn: Some(connect_fn),
1146                    }
1147                    .into();
1148                } else if let HydroNode::ExternalInput {
1149                    from_external_key,
1150                    from_port_id,
1151                    from_many,
1152                    codec_type,
1153                    port_hint,
1154                    instantiate_fn,
1155                    metadata,
1156                    ..
1157                } = n
1158                {
1159                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1160                        DebugInstantiate::Building => {
1161                            let from_node = externals
1162                                .get(*from_external_key)
1163                                .unwrap_or_else(|| {
1164                                    panic!(
1165                                        "A external used in the graph was not instantiated: {}",
1166                                        from_external_key,
1167                                    )
1168                                })
1169                                .clone();
1170
1171                            match metadata.location_id.root() {
1172                                &LocationId::Process(process_key) => {
1173                                    let to_node = processes
1174                                        .get(process_key)
1175                                        .unwrap_or_else(|| {
1176                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1177                                        })
1178                                        .clone();
1179
1180                                    let sink_port = from_node.next_port();
1181                                    let source_port = to_node.next_port();
1182
1183                                    from_node.register(*from_port_id, sink_port.clone());
1184
1185                                    (
1186                                        (
1187                                            parse_quote!(DUMMY),
1188                                            if *from_many {
1189                                                D::e2o_many_source(
1190                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1191                                                    &to_node, &source_port,
1192                                                    codec_type.0.as_ref(),
1193                                                    format!("{}_{}", *from_external_key, *from_port_id)
1194                                                )
1195                                            } else {
1196                                                D::e2o_source(
1197                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1198                                                    &from_node, &sink_port,
1199                                                    &to_node, &source_port,
1200                                                    codec_type.0.as_ref(),
1201                                                    format!("{}_{}", *from_external_key, *from_port_id)
1202                                                )
1203                                            },
1204                                        ),
1205                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1206                                    )
1207                                }
1208                                LocationId::Cluster(cluster_key) => {
1209                                    let to_node = clusters
1210                                        .get(*cluster_key)
1211                                        .unwrap_or_else(|| {
1212                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1213                                        })
1214                                        .clone();
1215
1216                                    let sink_port = from_node.next_port();
1217                                    let source_port = to_node.next_port();
1218
1219                                    from_node.register(*from_port_id, sink_port.clone());
1220
1221                                    (
1222                                        (
1223                                            parse_quote!(DUMMY),
1224                                            D::e2m_source(
1225                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1226                                                &from_node, &sink_port,
1227                                                &to_node, &source_port,
1228                                                codec_type.0.as_ref(),
1229                                                format!("{}_{}", *from_external_key, *from_port_id)
1230                                            ),
1231                                        ),
1232                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1233                                    )
1234                                }
1235                                _ => panic!()
1236                            }
1237                        },
1238
1239                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1240                    };
1241
1242                    *instantiate_fn = DebugInstantiateFinalized {
1243                        sink: sink_expr,
1244                        source: source_expr,
1245                        connect_fn: Some(connect_fn),
1246                    }
1247                    .into();
1248                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1249                    let element_type = match &metadata.collection_kind {
1250                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1251                        _ => panic!("Embedded source must have Stream collection kind"),
1252                    };
1253                    let location_key = match metadata.location_id.root() {
1254                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1255                        _ => panic!("Embedded source must be on a process or cluster"),
1256                    };
1257                    D::register_embedded_stream_input(
1258                        &mut refcell_env.borrow_mut(),
1259                        location_key,
1260                        ident,
1261                        &element_type,
1262                    );
1263                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1264                    let element_type = match &metadata.collection_kind {
1265                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1266                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1267                    };
1268                    let location_key = match metadata.location_id.root() {
1269                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1271                    };
1272                    D::register_embedded_singleton_input(
1273                        &mut refcell_env.borrow_mut(),
1274                        location_key,
1275                        ident,
1276                        &element_type,
1277                    );
1278                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1279                    match state {
1280                        ClusterMembersState::Uninit => {
1281                            let at_location = metadata.location_id.root().clone();
1282                            let key = (at_location.clone(), location_id.key());
1283                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1284                                // First occurrence: call cluster_membership_stream and mark as Stream.
1285                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1286                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1287                                    &(),
1288                                );
1289                                *state = ClusterMembersState::Stream(expr.into());
1290                            } else {
1291                                // Already instantiated for this (at, target) pair: just tee.
1292                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1293                            }
1294                        }
1295                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1296                            panic!("cluster members already finalized");
1297                        }
1298                    }
1299                }
1300            },
1301            seen_tees,
1302            false,
1303        );
1304    }
1305
1306    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1307        self.transform_bottom_up(
1308            &mut |l| {
1309                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1310                    match instantiate_fn {
1311                        DebugInstantiate::Building => panic!("network not built"),
1312
1313                        DebugInstantiate::Finalized(finalized) => {
1314                            (finalized.connect_fn.take().unwrap())();
1315                        }
1316                    }
1317                }
1318            },
1319            &mut |n| {
1320                if let HydroNode::Network { instantiate_fn, .. }
1321                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1322                {
1323                    match instantiate_fn {
1324                        DebugInstantiate::Building => panic!("network not built"),
1325
1326                        DebugInstantiate::Finalized(finalized) => {
1327                            (finalized.connect_fn.take().unwrap())();
1328                        }
1329                    }
1330                }
1331            },
1332            seen_tees,
1333            false,
1334        );
1335    }
1336
1337    pub fn transform_bottom_up(
1338        &mut self,
1339        transform_root: &mut impl FnMut(&mut HydroRoot),
1340        transform_node: &mut impl FnMut(&mut HydroNode),
1341        seen_tees: &mut SeenSharedNodes,
1342        check_well_formed: bool,
1343    ) {
1344        self.transform_children(
1345            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1346            seen_tees,
1347        );
1348
1349        transform_root(self);
1350    }
1351
1352    pub fn transform_children(
1353        &mut self,
1354        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1355        seen_tees: &mut SeenSharedNodes,
1356    ) {
1357        match self {
1358            HydroRoot::ForEach { f, input, .. } => {
1359                f.transform_children(&mut transform, seen_tees);
1360                transform(input, seen_tees);
1361            }
1362            HydroRoot::SendExternal { input, .. }
1363            | HydroRoot::DestSink { input, .. }
1364            | HydroRoot::CycleSink { input, .. }
1365            | HydroRoot::EmbeddedOutput { input, .. }
1366            | HydroRoot::Null { input, .. } => {
1367                transform(input, seen_tees);
1368            }
1369        }
1370    }
1371
1372    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1373        match self {
1374            HydroRoot::ForEach {
1375                f,
1376                input,
1377                op_metadata,
1378            } => HydroRoot::ForEach {
1379                f: f.deep_clone(seen_tees),
1380                input: Box::new(input.deep_clone(seen_tees)),
1381                op_metadata: op_metadata.clone(),
1382            },
1383            HydroRoot::SendExternal {
1384                to_external_key,
1385                to_port_id,
1386                to_many,
1387                unpaired,
1388                serialize_fn,
1389                instantiate_fn,
1390                input,
1391                op_metadata,
1392            } => HydroRoot::SendExternal {
1393                to_external_key: *to_external_key,
1394                to_port_id: *to_port_id,
1395                to_many: *to_many,
1396                unpaired: *unpaired,
1397                serialize_fn: serialize_fn.clone(),
1398                instantiate_fn: instantiate_fn.clone(),
1399                input: Box::new(input.deep_clone(seen_tees)),
1400                op_metadata: op_metadata.clone(),
1401            },
1402            HydroRoot::DestSink {
1403                sink,
1404                input,
1405                op_metadata,
1406            } => HydroRoot::DestSink {
1407                sink: sink.clone(),
1408                input: Box::new(input.deep_clone(seen_tees)),
1409                op_metadata: op_metadata.clone(),
1410            },
1411            HydroRoot::CycleSink {
1412                cycle_id,
1413                input,
1414                op_metadata,
1415            } => HydroRoot::CycleSink {
1416                cycle_id: *cycle_id,
1417                input: Box::new(input.deep_clone(seen_tees)),
1418                op_metadata: op_metadata.clone(),
1419            },
1420            HydroRoot::EmbeddedOutput {
1421                ident,
1422                input,
1423                op_metadata,
1424            } => HydroRoot::EmbeddedOutput {
1425                ident: ident.clone(),
1426                input: Box::new(input.deep_clone(seen_tees)),
1427                op_metadata: op_metadata.clone(),
1428            },
1429            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1430                input: Box::new(input.deep_clone(seen_tees)),
1431                op_metadata: op_metadata.clone(),
1432            },
1433        }
1434    }
1435
1436    #[cfg(feature = "build")]
1437    pub fn emit(
1438        &mut self,
1439        graph_builders: &mut dyn DfirBuilder,
1440        seen_tees: &mut SeenSharedNodes,
1441        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1442        next_stmt_id: &mut crate::Counter<StmtId>,
1443        fold_hooked_idents: &mut HashSet<String>,
1444    ) {
1445        self.emit_core(
1446            &mut BuildersOrCallback::<
1447                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1448                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1449            >::Builders(graph_builders),
1450            seen_tees,
1451            built_tees,
1452            next_stmt_id,
1453            fold_hooked_idents,
1454        );
1455    }
1456
1457    #[cfg(feature = "build")]
1458    pub fn emit_core(
1459        &mut self,
1460        builders_or_callback: &mut BuildersOrCallback<
1461            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1462            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1463        >,
1464        seen_tees: &mut SeenSharedNodes,
1465        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1466        next_stmt_id: &mut crate::Counter<StmtId>,
1467        fold_hooked_idents: &mut HashSet<String>,
1468    ) {
1469        match self {
1470            HydroRoot::ForEach { f, input, .. } => {
1471                let input_ident = input.emit_core(
1472                    builders_or_callback,
1473                    seen_tees,
1474                    built_tees,
1475                    next_stmt_id,
1476                    fold_hooked_idents,
1477                );
1478
1479                let input_ident = maybe_observe_for_mut(
1480                    f,
1481                    input_ident,
1482                    &input.metadata().location_id,
1483                    &input.metadata().collection_kind,
1484                    &input.metadata().op,
1485                    builders_or_callback,
1486                    next_stmt_id,
1487                );
1488
1489                let stmt_id = next_stmt_id.get_and_increment();
1490
1491                match builders_or_callback {
1492                    BuildersOrCallback::Builders(graph_builders) => {
1493                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1494
1495                        // Look up each captured ref's ident from built_tees
1496                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1497                            let HydroNode::Reference { inner, .. } = ref_node else {
1498                                panic!("singleton_refs should only contain HydroNode::Reference");
1499                            };
1500                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1501                            let idents = built_tees.get(&ptr).expect(
1502                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1503                            );
1504                            ident_stack.push(idents[0].clone());
1505                        }
1506
1507                        let f_tokens = f.emit_tokens(&mut ident_stack);
1508
1509                        graph_builders
1510                            .get_dfir_mut(&input.metadata().location_id)
1511                            .add_dfir(
1512                                parse_quote! {
1513                                    #input_ident -> for_each(#f_tokens);
1514                                },
1515                                None,
1516                                Some(&stmt_id.to_string()),
1517                            );
1518                    }
1519                    BuildersOrCallback::Callback(leaf_callback, _) => {
1520                        leaf_callback(self, next_stmt_id);
1521                    }
1522                }
1523            }
1524
1525            HydroRoot::SendExternal {
1526                serialize_fn,
1527                instantiate_fn,
1528                input,
1529                ..
1530            } => {
1531                let input_ident = input.emit_core(
1532                    builders_or_callback,
1533                    seen_tees,
1534                    built_tees,
1535                    next_stmt_id,
1536                    fold_hooked_idents,
1537                );
1538
1539                let stmt_id = next_stmt_id.get_and_increment();
1540
1541                match builders_or_callback {
1542                    BuildersOrCallback::Builders(graph_builders) => {
1543                        let (sink_expr, _) = match instantiate_fn {
1544                            DebugInstantiate::Building => (
1545                                syn::parse_quote!(DUMMY_SINK),
1546                                syn::parse_quote!(DUMMY_SOURCE),
1547                            ),
1548
1549                            DebugInstantiate::Finalized(finalized) => {
1550                                (finalized.sink.clone(), finalized.source.clone())
1551                            }
1552                        };
1553
1554                        graph_builders.create_external_output(
1555                            &input.metadata().location_id,
1556                            sink_expr,
1557                            &input_ident,
1558                            serialize_fn.as_ref(),
1559                            stmt_id,
1560                        );
1561                    }
1562                    BuildersOrCallback::Callback(leaf_callback, _) => {
1563                        leaf_callback(self, next_stmt_id);
1564                    }
1565                }
1566            }
1567
1568            HydroRoot::DestSink { sink, input, .. } => {
1569                let input_ident = input.emit_core(
1570                    builders_or_callback,
1571                    seen_tees,
1572                    built_tees,
1573                    next_stmt_id,
1574                    fold_hooked_idents,
1575                );
1576
1577                let stmt_id = next_stmt_id.get_and_increment();
1578
1579                match builders_or_callback {
1580                    BuildersOrCallback::Builders(graph_builders) => {
1581                        graph_builders
1582                            .get_dfir_mut(&input.metadata().location_id)
1583                            .add_dfir(
1584                                parse_quote! {
1585                                    #input_ident -> dest_sink(#sink);
1586                                },
1587                                None,
1588                                Some(&stmt_id.to_string()),
1589                            );
1590                    }
1591                    BuildersOrCallback::Callback(leaf_callback, _) => {
1592                        leaf_callback(self, next_stmt_id);
1593                    }
1594                }
1595            }
1596
1597            HydroRoot::CycleSink {
1598                cycle_id, input, ..
1599            } => {
1600                let input_ident = input.emit_core(
1601                    builders_or_callback,
1602                    seen_tees,
1603                    built_tees,
1604                    next_stmt_id,
1605                    fold_hooked_idents,
1606                );
1607
1608                match builders_or_callback {
1609                    BuildersOrCallback::Builders(graph_builders) => {
1610                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1611                            CollectionKind::KeyedSingleton {
1612                                key_type,
1613                                value_type,
1614                                ..
1615                            }
1616                            | CollectionKind::KeyedStream {
1617                                key_type,
1618                                value_type,
1619                                ..
1620                            } => {
1621                                parse_quote!((#key_type, #value_type))
1622                            }
1623                            CollectionKind::Stream { element_type, .. }
1624                            | CollectionKind::Singleton { element_type, .. }
1625                            | CollectionKind::Optional { element_type, .. } => {
1626                                parse_quote!(#element_type)
1627                            }
1628                        };
1629
1630                        let cycle_id_ident = cycle_id.as_ident();
1631                        graph_builders
1632                            .get_dfir_mut(&input.metadata().location_id)
1633                            .add_dfir(
1634                                parse_quote! {
1635                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1636                                },
1637                                None,
1638                                None,
1639                            );
1640                    }
1641                    // No ID, no callback
1642                    BuildersOrCallback::Callback(_, _) => {}
1643                }
1644            }
1645
1646            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1647                let input_ident = input.emit_core(
1648                    builders_or_callback,
1649                    seen_tees,
1650                    built_tees,
1651                    next_stmt_id,
1652                    fold_hooked_idents,
1653                );
1654
1655                let stmt_id = next_stmt_id.get_and_increment();
1656
1657                match builders_or_callback {
1658                    BuildersOrCallback::Builders(graph_builders) => {
1659                        graph_builders
1660                            .get_dfir_mut(&input.metadata().location_id)
1661                            .add_dfir(
1662                                parse_quote! {
1663                                    #input_ident -> for_each(&mut #ident);
1664                                },
1665                                None,
1666                                Some(&stmt_id.to_string()),
1667                            );
1668                    }
1669                    BuildersOrCallback::Callback(leaf_callback, _) => {
1670                        leaf_callback(self, next_stmt_id);
1671                    }
1672                }
1673            }
1674
1675            HydroRoot::Null { input, .. } => {
1676                let input_ident = input.emit_core(
1677                    builders_or_callback,
1678                    seen_tees,
1679                    built_tees,
1680                    next_stmt_id,
1681                    fold_hooked_idents,
1682                );
1683
1684                let stmt_id = next_stmt_id.get_and_increment();
1685
1686                match builders_or_callback {
1687                    BuildersOrCallback::Builders(graph_builders) => {
1688                        graph_builders
1689                            .get_dfir_mut(&input.metadata().location_id)
1690                            .add_dfir(
1691                                parse_quote! {
1692                                    #input_ident -> for_each(|_| {});
1693                                },
1694                                None,
1695                                Some(&stmt_id.to_string()),
1696                            );
1697                    }
1698                    BuildersOrCallback::Callback(leaf_callback, _) => {
1699                        leaf_callback(self, next_stmt_id);
1700                    }
1701                }
1702            }
1703        }
1704    }
1705
1706    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1707        match self {
1708            HydroRoot::ForEach { op_metadata, .. }
1709            | HydroRoot::SendExternal { op_metadata, .. }
1710            | HydroRoot::DestSink { op_metadata, .. }
1711            | HydroRoot::CycleSink { op_metadata, .. }
1712            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1713            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1714        }
1715    }
1716
1717    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1718        match self {
1719            HydroRoot::ForEach { op_metadata, .. }
1720            | HydroRoot::SendExternal { op_metadata, .. }
1721            | HydroRoot::DestSink { op_metadata, .. }
1722            | HydroRoot::CycleSink { op_metadata, .. }
1723            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1724            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1725        }
1726    }
1727
1728    pub fn input(&self) -> &HydroNode {
1729        match self {
1730            HydroRoot::ForEach { input, .. }
1731            | HydroRoot::SendExternal { input, .. }
1732            | HydroRoot::DestSink { input, .. }
1733            | HydroRoot::CycleSink { input, .. }
1734            | HydroRoot::EmbeddedOutput { input, .. }
1735            | HydroRoot::Null { input, .. } => input,
1736        }
1737    }
1738
1739    pub fn input_metadata(&self) -> &HydroIrMetadata {
1740        self.input().metadata()
1741    }
1742
1743    pub fn print_root(&self) -> String {
1744        match self {
1745            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1746            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1747            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1748            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1749            HydroRoot::EmbeddedOutput { ident, .. } => {
1750                format!("EmbeddedOutput({})", ident)
1751            }
1752            HydroRoot::Null { .. } => "Null".to_owned(),
1753        }
1754    }
1755
1756    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1757        match self {
1758            HydroRoot::ForEach { f, .. } => {
1759                transform(&mut f.expr);
1760            }
1761            HydroRoot::DestSink { sink, .. } => {
1762                transform(sink);
1763            }
1764            HydroRoot::SendExternal { .. }
1765            | HydroRoot::CycleSink { .. }
1766            | HydroRoot::EmbeddedOutput { .. }
1767            | HydroRoot::Null { .. } => {}
1768        }
1769    }
1770}
1771
1772#[cfg(feature = "build")]
1773fn tick_of(loc: &LocationId) -> Option<ClockId> {
1774    match loc {
1775        LocationId::Tick(id, _) => Some(*id),
1776        LocationId::Atomic(inner) => tick_of(inner),
1777        _ => None,
1778    }
1779}
1780
1781#[cfg(feature = "build")]
1782fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1783    match loc {
1784        LocationId::Tick(id, inner) => {
1785            *id = uf_find(uf, *id);
1786            remap_location(inner, uf);
1787        }
1788        LocationId::Atomic(inner) => {
1789            remap_location(inner, uf);
1790        }
1791        LocationId::Process(_) | LocationId::Cluster(_) => {}
1792    }
1793}
1794
1795#[cfg(feature = "build")]
1796fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1797    let p = *parent.get(&x).unwrap_or(&x);
1798    if p == x {
1799        return x;
1800    }
1801    let root = uf_find(parent, p);
1802    parent.insert(x, root);
1803    root
1804}
1805
1806#[cfg(feature = "build")]
1807fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1808    let ra = uf_find(parent, a);
1809    let rb = uf_find(parent, b);
1810    if ra != rb {
1811        parent.insert(ra, rb);
1812    }
1813}
1814
1815/// Traverse the IR to build a union-find that unifies tick IDs connected
1816/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1817/// rewrite all `LocationId`s to use the representative tick ID.
1818#[cfg(feature = "build")]
1819pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1820    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1821
1822    // Pass 1: collect unifications.
1823    transform_bottom_up(
1824        ir,
1825        &mut |_| {},
1826        &mut |node: &mut HydroNode| match node {
1827            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1828                if let (Some(a), Some(b)) = (
1829                    tick_of(&inner.metadata().location_id),
1830                    tick_of(&metadata.location_id),
1831                ) {
1832                    uf_union(&mut uf, a, b);
1833                }
1834            }
1835            HydroNode::Chain {
1836                first,
1837                second,
1838                metadata,
1839            }
1840            | HydroNode::ChainFirst {
1841                first,
1842                second,
1843                metadata,
1844            }
1845            | HydroNode::MergeOrdered {
1846                first,
1847                second,
1848                metadata,
1849            } => {
1850                if let (Some(a), Some(b)) = (
1851                    tick_of(&first.metadata().location_id),
1852                    tick_of(&metadata.location_id),
1853                ) {
1854                    uf_union(&mut uf, a, b);
1855                }
1856                if let (Some(a), Some(b)) = (
1857                    tick_of(&second.metadata().location_id),
1858                    tick_of(&metadata.location_id),
1859                ) {
1860                    uf_union(&mut uf, a, b);
1861                }
1862            }
1863            _ => {}
1864        },
1865        false,
1866    );
1867
1868    // Pass 2: rewrite all LocationIds.
1869    transform_bottom_up(
1870        ir,
1871        &mut |_| {},
1872        &mut |node: &mut HydroNode| {
1873            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1874        },
1875        false,
1876    );
1877}
1878
1879#[cfg(feature = "build")]
1880pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1881    let mut builders = SecondaryMap::new();
1882    let mut seen_tees = HashMap::new();
1883    let mut built_tees = HashMap::new();
1884    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1885    let mut fold_hooked_idents = HashSet::new();
1886    for leaf in ir {
1887        leaf.emit(
1888            &mut builders,
1889            &mut seen_tees,
1890            &mut built_tees,
1891            &mut next_stmt_id,
1892            &mut fold_hooked_idents,
1893        );
1894    }
1895    builders
1896}
1897
1898#[cfg(feature = "build")]
1899pub fn traverse_dfir(
1900    ir: &mut [HydroRoot],
1901    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1902    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1903) {
1904    let mut seen_tees = HashMap::new();
1905    let mut built_tees = HashMap::new();
1906    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1907    let mut fold_hooked_idents = HashSet::new();
1908    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1909    ir.iter_mut().for_each(|leaf| {
1910        leaf.emit_core(
1911            &mut callback,
1912            &mut seen_tees,
1913            &mut built_tees,
1914            &mut next_stmt_id,
1915            &mut fold_hooked_idents,
1916        );
1917    });
1918}
1919
1920pub fn transform_bottom_up(
1921    ir: &mut [HydroRoot],
1922    transform_root: &mut impl FnMut(&mut HydroRoot),
1923    transform_node: &mut impl FnMut(&mut HydroNode),
1924    check_well_formed: bool,
1925) {
1926    let mut seen_tees = HashMap::new();
1927    ir.iter_mut().for_each(|leaf| {
1928        leaf.transform_bottom_up(
1929            transform_root,
1930            transform_node,
1931            &mut seen_tees,
1932            check_well_formed,
1933        );
1934    });
1935}
1936
1937pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1938    let mut seen_tees = HashMap::new();
1939    ir.iter()
1940        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1941        .collect()
1942}
1943
1944type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1945thread_local! {
1946    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1947    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1948    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1949    /// on subsequent encounters, preventing infinite loops.
1950    static SERIALIZED_SHARED: PrintedTees
1951        = const { RefCell::new(None) };
1952}
1953
1954pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1955    PRINTED_TEES.with(|printed_tees| {
1956        let mut printed_tees_mut = printed_tees.borrow_mut();
1957        *printed_tees_mut = Some((0, HashMap::new()));
1958        drop(printed_tees_mut);
1959
1960        let ret = f();
1961
1962        let mut printed_tees_mut = printed_tees.borrow_mut();
1963        *printed_tees_mut = None;
1964
1965        ret
1966    })
1967}
1968
1969/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1970/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1971/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1972/// back-reference.  The tracking state is restored when `f` returns or panics.
1973pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1974    let _guard = SerializedSharedGuard::enter();
1975    f()
1976}
1977
1978/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1979/// making `serialize_dedup_shared` re-entrant and panic-safe.
1980struct SerializedSharedGuard {
1981    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1982}
1983
1984impl SerializedSharedGuard {
1985    fn enter() -> Self {
1986        let previous = SERIALIZED_SHARED.with(|cell| {
1987            let mut guard = cell.borrow_mut();
1988            guard.replace((0, HashMap::new()))
1989        });
1990        Self { previous }
1991    }
1992}
1993
1994impl Drop for SerializedSharedGuard {
1995    fn drop(&mut self) {
1996        SERIALIZED_SHARED.with(|cell| {
1997            *cell.borrow_mut() = self.previous.take();
1998        });
1999    }
2000}
2001
2002pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2003
2004impl serde::Serialize for SharedNode {
2005    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2006    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2007    /// same subtree every time and, if the graph ever contains a cycle, loop
2008    /// forever.
2009    ///
2010    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2011    /// integer id.  The first time we see a pointer we assign it the next id and
2012    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2013    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2014    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2015    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2016        SERIALIZED_SHARED.with(|cell| {
2017            let mut guard = cell.borrow_mut();
2018            // (next_id, pointer → assigned_id)
2019            let state = guard.as_mut().ok_or_else(|| {
2020                serde::ser::Error::custom(
2021                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2022                )
2023            })?;
2024            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2025
2026            if let Some(&id) = state.1.get(&ptr) {
2027                drop(guard);
2028                use serde::ser::SerializeMap;
2029                let mut map = serializer.serialize_map(Some(1))?;
2030                map.serialize_entry("$shared_ref", &id)?;
2031                map.end()
2032            } else {
2033                let id = state.0;
2034                state.0 += 1;
2035                state.1.insert(ptr, id);
2036                drop(guard);
2037
2038                use serde::ser::SerializeMap;
2039                let mut map = serializer.serialize_map(Some(2))?;
2040                map.serialize_entry("$shared", &id)?;
2041                map.serialize_entry("node", &*self.0.borrow())?;
2042                map.end()
2043            }
2044        })
2045    }
2046}
2047
2048impl SharedNode {
2049    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2050        Rc::as_ptr(&self.0)
2051    }
2052}
2053
2054impl Debug for SharedNode {
2055    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2056        PRINTED_TEES.with(|printed_tees| {
2057            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2058            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2059
2060            if let Some(printed_tees_mut) = printed_tees_mut {
2061                if let Some(existing) = printed_tees_mut
2062                    .1
2063                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2064                {
2065                    write!(f, "<shared {}>", existing)
2066                } else {
2067                    let next_id = printed_tees_mut.0;
2068                    printed_tees_mut.0 += 1;
2069                    printed_tees_mut
2070                        .1
2071                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2072                    drop(printed_tees_mut_borrow);
2073                    write!(f, "<shared {}>: ", next_id)?;
2074                    Debug::fmt(&self.0.borrow(), f)
2075                }
2076            } else {
2077                drop(printed_tees_mut_borrow);
2078                write!(f, "<shared>: ")?;
2079                Debug::fmt(&self.0.borrow(), f)
2080            }
2081        })
2082    }
2083}
2084
2085impl Hash for SharedNode {
2086    fn hash<H: Hasher>(&self, state: &mut H) {
2087        self.0.borrow_mut().hash(state);
2088    }
2089}
2090
2091/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2092///
2093/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2094/// immutable accesses share the current group.
2095#[derive(Debug)]
2096pub enum AccessCounter {
2097    Counting(Cell<u32>),
2098    Frozen(u32),
2099}
2100
2101impl AccessCounter {
2102    pub fn new() -> Self {
2103        Self::Counting(Cell::new(0))
2104    }
2105
2106    /// Assign the next access group for this reference.
2107    /// Mutable accesses get an isolated group (counter increments before and after).
2108    /// Immutable accesses share the current group.
2109    pub fn next_group(&self, is_mut: bool) -> Self {
2110        let AccessCounter::Counting(count) = self else {
2111            panic!("Cannot count on `AccessCounter::Frozen`");
2112        };
2113        let c = if is_mut {
2114            let c = count.get() + 1;
2115            count.set(c + 1);
2116            c
2117        } else {
2118            count.get()
2119        };
2120        Self::Frozen(c)
2121    }
2122
2123    /// Creates a frozen counter to prevent further counting.
2124    pub fn freeze(&self) -> Self {
2125        Self::Frozen(match self {
2126            Self::Counting(count) => count.get(),
2127            Self::Frozen(count) => *count,
2128        })
2129    }
2130
2131    pub fn frozen_group(&self) -> u32 {
2132        let Self::Frozen(count) = self else {
2133            panic!("`AccessCounter` not frozen");
2134        };
2135        *count
2136    }
2137}
2138
2139impl Default for AccessCounter {
2140    fn default() -> Self {
2141        Self::new()
2142    }
2143}
2144
2145impl Hash for AccessCounter {
2146    fn hash<H: Hasher>(&self, _state: &mut H) {
2147        // Access counter does not participate in hashing — it is runtime bookkeeping.
2148    }
2149}
2150
2151impl serde::Serialize for AccessCounter {
2152    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2153        let count = match self {
2154            AccessCounter::Counting(count) => count.get(),
2155            AccessCounter::Frozen(count) => *count,
2156        };
2157        count.serialize(serializer)
2158    }
2159}
2160
2161#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2162pub enum BoundKind {
2163    Unbounded,
2164    Bounded,
2165}
2166
2167#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2168pub enum StreamOrder {
2169    NoOrder,
2170    TotalOrder,
2171}
2172
2173#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2174pub enum StreamRetry {
2175    AtLeastOnce,
2176    ExactlyOnce,
2177}
2178
2179#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2180pub enum KeyedSingletonBoundKind {
2181    Unbounded,
2182    MonotonicKeys,
2183    MonotonicValue,
2184    BoundedValue,
2185    Bounded,
2186}
2187
2188#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2189pub enum SingletonBoundKind {
2190    Unbounded,
2191    Monotonic,
2192    Bounded,
2193}
2194
2195#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2196pub enum CollectionKind {
2197    Stream {
2198        bound: BoundKind,
2199        order: StreamOrder,
2200        retry: StreamRetry,
2201        element_type: DebugType,
2202    },
2203    Singleton {
2204        bound: SingletonBoundKind,
2205        element_type: DebugType,
2206    },
2207    Optional {
2208        bound: BoundKind,
2209        element_type: DebugType,
2210    },
2211    KeyedStream {
2212        bound: BoundKind,
2213        value_order: StreamOrder,
2214        value_retry: StreamRetry,
2215        key_type: DebugType,
2216        value_type: DebugType,
2217    },
2218    KeyedSingleton {
2219        bound: KeyedSingletonBoundKind,
2220        key_type: DebugType,
2221        value_type: DebugType,
2222    },
2223}
2224
2225impl CollectionKind {
2226    pub fn is_bounded(&self) -> bool {
2227        matches!(
2228            self,
2229            CollectionKind::Stream {
2230                bound: BoundKind::Bounded,
2231                ..
2232            } | CollectionKind::Singleton {
2233                bound: SingletonBoundKind::Bounded,
2234                ..
2235            } | CollectionKind::Optional {
2236                bound: BoundKind::Bounded,
2237                ..
2238            } | CollectionKind::KeyedStream {
2239                bound: BoundKind::Bounded,
2240                ..
2241            } | CollectionKind::KeyedSingleton {
2242                bound: KeyedSingletonBoundKind::Bounded,
2243                ..
2244            }
2245        )
2246    }
2247
2248    /// Returns whether this collection kind is already "strict" (TotalOrder + ExactlyOnce),
2249    /// meaning no non-determinism needs to be observed for mut closures.
2250    pub fn is_strict(&self) -> bool {
2251        match self {
2252            CollectionKind::Stream { order, retry, .. } => {
2253                *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2254            }
2255            CollectionKind::KeyedStream {
2256                value_order,
2257                value_retry,
2258                ..
2259            } => {
2260                *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2261            }
2262            // Singletons/Optionals/KeyedSingletons do not have observable
2263            // non-determinism other than snapshots / batching
2264            CollectionKind::Singleton { .. }
2265            | CollectionKind::Optional { .. }
2266            | CollectionKind::KeyedSingleton { .. } => true,
2267        }
2268    }
2269
2270    /// Creates a "strict" version of this kind with TotalOrder and ExactlyOnce.
2271    pub fn strict_kind(&self) -> CollectionKind {
2272        match self {
2273            CollectionKind::Stream {
2274                bound,
2275                element_type,
2276                ..
2277            } => CollectionKind::Stream {
2278                bound: bound.clone(),
2279                order: StreamOrder::TotalOrder,
2280                retry: StreamRetry::ExactlyOnce,
2281                element_type: element_type.clone(),
2282            },
2283            CollectionKind::KeyedStream {
2284                bound,
2285                key_type,
2286                value_type,
2287                ..
2288            } => CollectionKind::KeyedStream {
2289                bound: bound.clone(),
2290                value_order: StreamOrder::TotalOrder,
2291                value_retry: StreamRetry::ExactlyOnce,
2292                key_type: key_type.clone(),
2293                value_type: value_type.clone(),
2294            },
2295            other => other.clone(),
2296        }
2297    }
2298}
2299
2300#[derive(Clone, serde::Serialize)]
2301pub struct HydroIrMetadata {
2302    pub location_id: LocationId,
2303    pub collection_kind: CollectionKind,
2304    pub consistency: Option<ClusterConsistency>,
2305    pub cardinality: Option<usize>,
2306    pub tag: Option<String>,
2307    pub op: HydroIrOpMetadata,
2308}
2309
2310// HydroIrMetadata shouldn't be used to hash or compare
2311impl Hash for HydroIrMetadata {
2312    fn hash<H: Hasher>(&self, _: &mut H) {}
2313}
2314
2315impl PartialEq for HydroIrMetadata {
2316    fn eq(&self, _: &Self) -> bool {
2317        true
2318    }
2319}
2320
2321impl Eq for HydroIrMetadata {}
2322
2323impl Debug for HydroIrMetadata {
2324    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2325        f.debug_struct("HydroIrMetadata")
2326            .field("location_id", &self.location_id)
2327            .field("collection_kind", &self.collection_kind)
2328            .finish()
2329    }
2330}
2331
2332/// Metadata that is specific to the operator itself, rather than its outputs.
2333/// This is available on _both_ inner nodes and roots.
2334#[derive(Clone, serde::Serialize)]
2335pub struct HydroIrOpMetadata {
2336    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2337    pub backtrace: Backtrace,
2338    pub cpu_usage: Option<f64>,
2339    pub network_recv_cpu_usage: Option<f64>,
2340    pub id: Option<usize>,
2341}
2342
2343impl HydroIrOpMetadata {
2344    #[expect(
2345        clippy::new_without_default,
2346        reason = "explicit calls to new ensure correct backtrace bounds"
2347    )]
2348    pub fn new() -> HydroIrOpMetadata {
2349        Self::new_with_skip(1)
2350    }
2351
2352    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2353        HydroIrOpMetadata {
2354            backtrace: Backtrace::get_backtrace(2 + skip_count),
2355            cpu_usage: None,
2356            network_recv_cpu_usage: None,
2357            id: None,
2358        }
2359    }
2360}
2361
2362impl Debug for HydroIrOpMetadata {
2363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2364        f.debug_struct("HydroIrOpMetadata").finish()
2365    }
2366}
2367
2368impl Hash for HydroIrOpMetadata {
2369    fn hash<H: Hasher>(&self, _: &mut H) {}
2370}
2371
2372/// An intermediate node in a Hydro graph, which consumes data
2373/// from upstream nodes and emits data to downstream nodes.
2374#[derive(Debug, Hash, serde::Serialize)]
2375pub enum HydroNode {
2376    Placeholder,
2377
2378    /// Manually "casts" between two different collection kinds.
2379    ///
2380    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2381    /// correctness checks. In particular, the user must ensure that every possible
2382    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2383    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2384    /// collection. This ensures that the simulator does not miss any possible outputs.
2385    Cast {
2386        inner: Box<HydroNode>,
2387        metadata: HydroIrMetadata,
2388    },
2389
2390    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2391    /// interpretation of the input stream.
2392    ///
2393    /// In production, this simply passes through the input, but in simulation, this operator
2394    /// explicitly selects a randomized interpretation.
2395    ObserveNonDet {
2396        inner: Box<HydroNode>,
2397        trusted: bool, // if true, we do not need to simulate non-determinism
2398        metadata: HydroIrMetadata,
2399    },
2400
2401    Source {
2402        source: HydroSource,
2403        metadata: HydroIrMetadata,
2404    },
2405
2406    SingletonSource {
2407        value: DebugExpr,
2408        first_tick_only: bool,
2409        metadata: HydroIrMetadata,
2410    },
2411
2412    CycleSource {
2413        cycle_id: CycleId,
2414        metadata: HydroIrMetadata,
2415    },
2416
2417    Tee {
2418        inner: SharedNode,
2419        metadata: HydroIrMetadata,
2420    },
2421
2422    /// A reference materialization point. Wraps a SharedNode so that:
2423    /// - The pipe output delivers data to one consumer
2424    /// - `#var` references can borrow the value from the slot
2425    ///
2426    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2427    /// `-> handoff()` depending on `kind`.
2428    ///
2429    /// Uses the same `built_tees` dedup pattern as `Tee`.
2430    Reference {
2431        inner: SharedNode,
2432        kind: crate::handoff_ref::HandoffRefKind,
2433        access_counter: AccessCounter,
2434        metadata: HydroIrMetadata,
2435    },
2436
2437    Partition {
2438        inner: SharedNode,
2439        f: ClosureExpr,
2440        is_true: bool,
2441        metadata: HydroIrMetadata,
2442    },
2443
2444    BeginAtomic {
2445        inner: Box<HydroNode>,
2446        metadata: HydroIrMetadata,
2447    },
2448
2449    EndAtomic {
2450        inner: Box<HydroNode>,
2451        metadata: HydroIrMetadata,
2452    },
2453
2454    Batch {
2455        inner: Box<HydroNode>,
2456        metadata: HydroIrMetadata,
2457    },
2458
2459    YieldConcat {
2460        inner: Box<HydroNode>,
2461        metadata: HydroIrMetadata,
2462    },
2463
2464    Chain {
2465        first: Box<HydroNode>,
2466        second: Box<HydroNode>,
2467        metadata: HydroIrMetadata,
2468    },
2469
2470    MergeOrdered {
2471        first: Box<HydroNode>,
2472        second: Box<HydroNode>,
2473        metadata: HydroIrMetadata,
2474    },
2475
2476    ChainFirst {
2477        first: Box<HydroNode>,
2478        second: Box<HydroNode>,
2479        metadata: HydroIrMetadata,
2480    },
2481
2482    CrossProduct {
2483        left: Box<HydroNode>,
2484        right: Box<HydroNode>,
2485        metadata: HydroIrMetadata,
2486    },
2487
2488    CrossSingleton {
2489        left: Box<HydroNode>,
2490        right: Box<HydroNode>,
2491        metadata: HydroIrMetadata,
2492    },
2493
2494    Join {
2495        left: Box<HydroNode>,
2496        right: Box<HydroNode>,
2497        metadata: HydroIrMetadata,
2498    },
2499
2500    /// Asymmetric join where the right (build) side is bounded.
2501    /// The build side is accumulated (stratum-delayed) into a hash table,
2502    /// then the left (probe) side streams through preserving its ordering.
2503    JoinHalf {
2504        left: Box<HydroNode>,
2505        right: Box<HydroNode>,
2506        metadata: HydroIrMetadata,
2507    },
2508
2509    Difference {
2510        pos: Box<HydroNode>,
2511        neg: Box<HydroNode>,
2512        metadata: HydroIrMetadata,
2513    },
2514
2515    AntiJoin {
2516        pos: Box<HydroNode>,
2517        neg: Box<HydroNode>,
2518        metadata: HydroIrMetadata,
2519    },
2520
2521    ResolveFutures {
2522        input: Box<HydroNode>,
2523        metadata: HydroIrMetadata,
2524    },
2525    ResolveFuturesBlocking {
2526        input: Box<HydroNode>,
2527        metadata: HydroIrMetadata,
2528    },
2529    ResolveFuturesOrdered {
2530        input: Box<HydroNode>,
2531        metadata: HydroIrMetadata,
2532    },
2533
2534    Map {
2535        f: ClosureExpr,
2536        input: Box<HydroNode>,
2537        metadata: HydroIrMetadata,
2538    },
2539    FlatMap {
2540        f: ClosureExpr,
2541        input: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544    FlatMapStreamBlocking {
2545        f: ClosureExpr,
2546        input: Box<HydroNode>,
2547        metadata: HydroIrMetadata,
2548    },
2549    Filter {
2550        f: ClosureExpr,
2551        input: Box<HydroNode>,
2552        metadata: HydroIrMetadata,
2553    },
2554    FilterMap {
2555        f: ClosureExpr,
2556        input: Box<HydroNode>,
2557        metadata: HydroIrMetadata,
2558    },
2559
2560    DeferTick {
2561        input: Box<HydroNode>,
2562        metadata: HydroIrMetadata,
2563    },
2564    Enumerate {
2565        input: Box<HydroNode>,
2566        metadata: HydroIrMetadata,
2567    },
2568    Inspect {
2569        f: ClosureExpr,
2570        input: Box<HydroNode>,
2571        metadata: HydroIrMetadata,
2572    },
2573
2574    Unique {
2575        input: Box<HydroNode>,
2576        metadata: HydroIrMetadata,
2577    },
2578
2579    Sort {
2580        input: Box<HydroNode>,
2581        metadata: HydroIrMetadata,
2582    },
2583    Fold {
2584        init: ClosureExpr,
2585        acc: ClosureExpr,
2586        input: Box<HydroNode>,
2587        metadata: HydroIrMetadata,
2588    },
2589
2590    Scan {
2591        init: ClosureExpr,
2592        acc: ClosureExpr,
2593        input: Box<HydroNode>,
2594        metadata: HydroIrMetadata,
2595    },
2596    ScanAsyncBlocking {
2597        init: ClosureExpr,
2598        acc: ClosureExpr,
2599        input: Box<HydroNode>,
2600        metadata: HydroIrMetadata,
2601    },
2602    FoldKeyed {
2603        init: ClosureExpr,
2604        acc: ClosureExpr,
2605        input: Box<HydroNode>,
2606        metadata: HydroIrMetadata,
2607    },
2608
2609    Reduce {
2610        f: ClosureExpr,
2611        input: Box<HydroNode>,
2612        metadata: HydroIrMetadata,
2613    },
2614    ReduceKeyed {
2615        f: ClosureExpr,
2616        input: Box<HydroNode>,
2617        metadata: HydroIrMetadata,
2618    },
2619    ReduceKeyedWatermark {
2620        f: ClosureExpr,
2621        input: Box<HydroNode>,
2622        watermark: Box<HydroNode>,
2623        metadata: HydroIrMetadata,
2624    },
2625
2626    Network {
2627        name: Option<String>,
2628        networking_info: crate::networking::NetworkingInfo,
2629        serialize_fn: Option<DebugExpr>,
2630        instantiate_fn: DebugInstantiate,
2631        deserialize_fn: Option<DebugExpr>,
2632        input: Box<HydroNode>,
2633        metadata: HydroIrMetadata,
2634    },
2635
2636    ExternalInput {
2637        from_external_key: LocationKey,
2638        from_port_id: ExternalPortId,
2639        from_many: bool,
2640        codec_type: DebugType,
2641        #[serde(skip)]
2642        port_hint: NetworkHint,
2643        instantiate_fn: DebugInstantiate,
2644        deserialize_fn: Option<DebugExpr>,
2645        metadata: HydroIrMetadata,
2646    },
2647
2648    Counter {
2649        tag: String,
2650        duration: DebugExpr,
2651        prefix: String,
2652        input: Box<HydroNode>,
2653        metadata: HydroIrMetadata,
2654    },
2655
2656    AssertIsConsistent {
2657        inner: Box<HydroNode>,
2658        trusted: bool,
2659        metadata: HydroIrMetadata,
2660    },
2661
2662    UnboundSingleton {
2663        inner: Box<HydroNode>,
2664        metadata: HydroIrMetadata,
2665    },
2666}
2667
2668pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2669pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2670
2671/// If `f` has a mut singleton ref and `in_kind` is non-strict, emits an
2672/// `observe_for_mut` node and returns the new ident. Otherwise returns
2673/// `in_ident` unchanged. Always consumes a stmt_id when applicable.
2674#[cfg(feature = "build")]
2675fn maybe_observe_for_mut(
2676    f: &ClosureExpr,
2677    in_ident: syn::Ident,
2678    in_location: &LocationId,
2679    in_kind: &CollectionKind,
2680    op_meta: &HydroIrOpMetadata,
2681    builders_or_callback: &mut BuildersOrCallback<
2682        impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2683        impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2684    >,
2685    next_stmt_id: &mut crate::Counter<StmtId>,
2686) -> syn::Ident {
2687    if f.has_mut_ref() && !in_kind.is_strict() {
2688        let observe_stmt_id = next_stmt_id.get_and_increment();
2689        let observe_ident =
2690            syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2691        if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2692            graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2693        }
2694        observe_ident
2695    } else {
2696        in_ident
2697    }
2698}
2699
2700impl HydroNode {
2701    pub fn transform_bottom_up(
2702        &mut self,
2703        transform: &mut impl FnMut(&mut HydroNode),
2704        seen_tees: &mut SeenSharedNodes,
2705        check_well_formed: bool,
2706    ) {
2707        self.transform_children(
2708            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2709            seen_tees,
2710        );
2711
2712        transform(self);
2713
2714        let self_location = self.metadata().location_id.root();
2715
2716        if check_well_formed {
2717            match &*self {
2718                HydroNode::Network { .. } => {}
2719                _ => {
2720                    self.input_metadata().iter().for_each(|i| {
2721                        if i.location_id.root() != self_location {
2722                            panic!(
2723                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2724                                i,
2725                                i.location_id.root(),
2726                                self,
2727                                self_location
2728                            )
2729                        }
2730                    });
2731                }
2732            }
2733        }
2734    }
2735
2736    #[inline(always)]
2737    pub fn transform_children(
2738        &mut self,
2739        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2740        seen_tees: &mut SeenSharedNodes,
2741    ) {
2742        match self {
2743            HydroNode::Placeholder => {
2744                panic!();
2745            }
2746
2747            HydroNode::Source { .. }
2748            | HydroNode::SingletonSource { .. }
2749            | HydroNode::CycleSource { .. }
2750            | HydroNode::ExternalInput { .. } => {}
2751
2752            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2753                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2754                    *inner = SharedNode(transformed.clone());
2755                } else {
2756                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2757                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2758                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2759                    transform(&mut orig, seen_tees);
2760                    *transformed_cell.borrow_mut() = orig;
2761                    *inner = SharedNode(transformed_cell);
2762                }
2763            }
2764
2765            HydroNode::Partition { inner, f, .. } => {
2766                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2767                    *inner = SharedNode(transformed.clone());
2768                } else {
2769                    f.transform_children(&mut transform, seen_tees);
2770                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2771                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2772                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2773                    transform(&mut orig, seen_tees);
2774                    *transformed_cell.borrow_mut() = orig;
2775                    *inner = SharedNode(transformed_cell);
2776                }
2777            }
2778
2779            HydroNode::Cast { inner, .. }
2780            | HydroNode::ObserveNonDet { inner, .. }
2781            | HydroNode::BeginAtomic { inner, .. }
2782            | HydroNode::EndAtomic { inner, .. }
2783            | HydroNode::Batch { inner, .. }
2784            | HydroNode::YieldConcat { inner, .. }
2785            | HydroNode::UnboundSingleton { inner, .. }
2786            | HydroNode::AssertIsConsistent { inner, .. } => {
2787                transform(inner.as_mut(), seen_tees);
2788            }
2789
2790            HydroNode::Chain { first, second, .. } => {
2791                transform(first.as_mut(), seen_tees);
2792                transform(second.as_mut(), seen_tees);
2793            }
2794
2795            HydroNode::MergeOrdered { first, second, .. } => {
2796                transform(first.as_mut(), seen_tees);
2797                transform(second.as_mut(), seen_tees);
2798            }
2799
2800            HydroNode::ChainFirst { first, second, .. } => {
2801                transform(first.as_mut(), seen_tees);
2802                transform(second.as_mut(), seen_tees);
2803            }
2804
2805            HydroNode::CrossSingleton { left, right, .. }
2806            | HydroNode::CrossProduct { left, right, .. }
2807            | HydroNode::Join { left, right, .. }
2808            | HydroNode::JoinHalf { left, right, .. } => {
2809                transform(left.as_mut(), seen_tees);
2810                transform(right.as_mut(), seen_tees);
2811            }
2812
2813            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2814                transform(pos.as_mut(), seen_tees);
2815                transform(neg.as_mut(), seen_tees);
2816            }
2817
2818            HydroNode::Map { f, input, .. } => {
2819                f.transform_children(&mut transform, seen_tees);
2820                transform(input.as_mut(), seen_tees);
2821            }
2822            HydroNode::FlatMap { f, input, .. }
2823            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2824            | HydroNode::Filter { f, input, .. }
2825            | HydroNode::FilterMap { f, input, .. }
2826            | HydroNode::Inspect { f, input, .. }
2827            | HydroNode::Reduce { f, input, .. }
2828            | HydroNode::ReduceKeyed { f, input, .. } => {
2829                f.transform_children(&mut transform, seen_tees);
2830                transform(input.as_mut(), seen_tees);
2831            }
2832            HydroNode::ReduceKeyedWatermark {
2833                f,
2834                input,
2835                watermark,
2836                ..
2837            } => {
2838                f.transform_children(&mut transform, seen_tees);
2839                transform(input.as_mut(), seen_tees);
2840                transform(watermark.as_mut(), seen_tees);
2841            }
2842            HydroNode::Fold {
2843                init, acc, input, ..
2844            }
2845            | HydroNode::Scan {
2846                init, acc, input, ..
2847            }
2848            | HydroNode::ScanAsyncBlocking {
2849                init, acc, input, ..
2850            }
2851            | HydroNode::FoldKeyed {
2852                init, acc, input, ..
2853            } => {
2854                init.transform_children(&mut transform, seen_tees);
2855                acc.transform_children(&mut transform, seen_tees);
2856                transform(input.as_mut(), seen_tees);
2857            }
2858            HydroNode::ResolveFutures { input, .. }
2859            | HydroNode::ResolveFuturesBlocking { input, .. }
2860            | HydroNode::ResolveFuturesOrdered { input, .. }
2861            | HydroNode::Sort { input, .. }
2862            | HydroNode::DeferTick { input, .. }
2863            | HydroNode::Enumerate { input, .. }
2864            | HydroNode::Unique { input, .. }
2865            | HydroNode::Network { input, .. }
2866            | HydroNode::Counter { input, .. } => {
2867                transform(input.as_mut(), seen_tees);
2868            }
2869        }
2870    }
2871
2872    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2873        match self {
2874            HydroNode::Placeholder => HydroNode::Placeholder,
2875            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2876                inner: Box::new(inner.deep_clone(seen_tees)),
2877                metadata: metadata.clone(),
2878            },
2879            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2880                inner: Box::new(inner.deep_clone(seen_tees)),
2881                metadata: metadata.clone(),
2882            },
2883            HydroNode::ObserveNonDet {
2884                inner,
2885                trusted,
2886                metadata,
2887            } => HydroNode::ObserveNonDet {
2888                inner: Box::new(inner.deep_clone(seen_tees)),
2889                trusted: *trusted,
2890                metadata: metadata.clone(),
2891            },
2892            HydroNode::AssertIsConsistent {
2893                inner,
2894                trusted,
2895                metadata,
2896            } => HydroNode::AssertIsConsistent {
2897                inner: Box::new(inner.deep_clone(seen_tees)),
2898                trusted: *trusted,
2899                metadata: metadata.clone(),
2900            },
2901            HydroNode::Source { source, metadata } => HydroNode::Source {
2902                source: source.clone(),
2903                metadata: metadata.clone(),
2904            },
2905            HydroNode::SingletonSource {
2906                value,
2907                first_tick_only,
2908                metadata,
2909            } => HydroNode::SingletonSource {
2910                value: value.clone(),
2911                first_tick_only: *first_tick_only,
2912                metadata: metadata.clone(),
2913            },
2914            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2915                cycle_id: *cycle_id,
2916                metadata: metadata.clone(),
2917            },
2918            HydroNode::Tee { inner, metadata }
2919            | HydroNode::Reference {
2920                inner, metadata, ..
2921            } => {
2922                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2923                    SharedNode(transformed.clone())
2924                } else {
2925                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2926                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2927                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2928                    *new_rc.borrow_mut() = cloned;
2929                    SharedNode(new_rc)
2930                };
2931                if let HydroNode::Reference {
2932                    kind,
2933                    access_counter,
2934                    ..
2935                } = self
2936                {
2937                    HydroNode::Reference {
2938                        inner: cloned_inner,
2939                        kind: *kind,
2940                        access_counter: access_counter.freeze(),
2941                        metadata: metadata.clone(),
2942                    }
2943                } else {
2944                    HydroNode::Tee {
2945                        inner: cloned_inner,
2946                        metadata: metadata.clone(),
2947                    }
2948                }
2949            }
2950            HydroNode::Partition {
2951                inner,
2952                f,
2953                is_true,
2954                metadata,
2955            } => {
2956                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2957                    HydroNode::Partition {
2958                        inner: SharedNode(transformed.clone()),
2959                        f: f.deep_clone(seen_tees),
2960                        is_true: *is_true,
2961                        metadata: metadata.clone(),
2962                    }
2963                } else {
2964                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2965                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2966                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2967                    *new_rc.borrow_mut() = cloned;
2968                    HydroNode::Partition {
2969                        inner: SharedNode(new_rc),
2970                        f: f.deep_clone(seen_tees),
2971                        is_true: *is_true,
2972                        metadata: metadata.clone(),
2973                    }
2974                }
2975            }
2976            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2977                inner: Box::new(inner.deep_clone(seen_tees)),
2978                metadata: metadata.clone(),
2979            },
2980            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2981                inner: Box::new(inner.deep_clone(seen_tees)),
2982                metadata: metadata.clone(),
2983            },
2984            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2985                inner: Box::new(inner.deep_clone(seen_tees)),
2986                metadata: metadata.clone(),
2987            },
2988            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2989                inner: Box::new(inner.deep_clone(seen_tees)),
2990                metadata: metadata.clone(),
2991            },
2992            HydroNode::Chain {
2993                first,
2994                second,
2995                metadata,
2996            } => HydroNode::Chain {
2997                first: Box::new(first.deep_clone(seen_tees)),
2998                second: Box::new(second.deep_clone(seen_tees)),
2999                metadata: metadata.clone(),
3000            },
3001            HydroNode::MergeOrdered {
3002                first,
3003                second,
3004                metadata,
3005            } => HydroNode::MergeOrdered {
3006                first: Box::new(first.deep_clone(seen_tees)),
3007                second: Box::new(second.deep_clone(seen_tees)),
3008                metadata: metadata.clone(),
3009            },
3010            HydroNode::ChainFirst {
3011                first,
3012                second,
3013                metadata,
3014            } => HydroNode::ChainFirst {
3015                first: Box::new(first.deep_clone(seen_tees)),
3016                second: Box::new(second.deep_clone(seen_tees)),
3017                metadata: metadata.clone(),
3018            },
3019            HydroNode::CrossProduct {
3020                left,
3021                right,
3022                metadata,
3023            } => HydroNode::CrossProduct {
3024                left: Box::new(left.deep_clone(seen_tees)),
3025                right: Box::new(right.deep_clone(seen_tees)),
3026                metadata: metadata.clone(),
3027            },
3028            HydroNode::CrossSingleton {
3029                left,
3030                right,
3031                metadata,
3032            } => HydroNode::CrossSingleton {
3033                left: Box::new(left.deep_clone(seen_tees)),
3034                right: Box::new(right.deep_clone(seen_tees)),
3035                metadata: metadata.clone(),
3036            },
3037            HydroNode::Join {
3038                left,
3039                right,
3040                metadata,
3041            } => HydroNode::Join {
3042                left: Box::new(left.deep_clone(seen_tees)),
3043                right: Box::new(right.deep_clone(seen_tees)),
3044                metadata: metadata.clone(),
3045            },
3046            HydroNode::JoinHalf {
3047                left,
3048                right,
3049                metadata,
3050            } => HydroNode::JoinHalf {
3051                left: Box::new(left.deep_clone(seen_tees)),
3052                right: Box::new(right.deep_clone(seen_tees)),
3053                metadata: metadata.clone(),
3054            },
3055            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3056                pos: Box::new(pos.deep_clone(seen_tees)),
3057                neg: Box::new(neg.deep_clone(seen_tees)),
3058                metadata: metadata.clone(),
3059            },
3060            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3061                pos: Box::new(pos.deep_clone(seen_tees)),
3062                neg: Box::new(neg.deep_clone(seen_tees)),
3063                metadata: metadata.clone(),
3064            },
3065            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3066                input: Box::new(input.deep_clone(seen_tees)),
3067                metadata: metadata.clone(),
3068            },
3069            HydroNode::ResolveFuturesBlocking { input, metadata } => {
3070                HydroNode::ResolveFuturesBlocking {
3071                    input: Box::new(input.deep_clone(seen_tees)),
3072                    metadata: metadata.clone(),
3073                }
3074            }
3075            HydroNode::ResolveFuturesOrdered { input, metadata } => {
3076                HydroNode::ResolveFuturesOrdered {
3077                    input: Box::new(input.deep_clone(seen_tees)),
3078                    metadata: metadata.clone(),
3079                }
3080            }
3081            HydroNode::Map { f, input, metadata } => HydroNode::Map {
3082                f: f.deep_clone(seen_tees),
3083                input: Box::new(input.deep_clone(seen_tees)),
3084                metadata: metadata.clone(),
3085            },
3086            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3087                f: f.deep_clone(seen_tees),
3088                input: Box::new(input.deep_clone(seen_tees)),
3089                metadata: metadata.clone(),
3090            },
3091            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3092                HydroNode::FlatMapStreamBlocking {
3093                    f: f.deep_clone(seen_tees),
3094                    input: Box::new(input.deep_clone(seen_tees)),
3095                    metadata: metadata.clone(),
3096                }
3097            }
3098            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3099                f: f.deep_clone(seen_tees),
3100                input: Box::new(input.deep_clone(seen_tees)),
3101                metadata: metadata.clone(),
3102            },
3103            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3104                f: f.deep_clone(seen_tees),
3105                input: Box::new(input.deep_clone(seen_tees)),
3106                metadata: metadata.clone(),
3107            },
3108            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3109                input: Box::new(input.deep_clone(seen_tees)),
3110                metadata: metadata.clone(),
3111            },
3112            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3113                input: Box::new(input.deep_clone(seen_tees)),
3114                metadata: metadata.clone(),
3115            },
3116            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3117                f: f.deep_clone(seen_tees),
3118                input: Box::new(input.deep_clone(seen_tees)),
3119                metadata: metadata.clone(),
3120            },
3121            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3122                input: Box::new(input.deep_clone(seen_tees)),
3123                metadata: metadata.clone(),
3124            },
3125            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3126                input: Box::new(input.deep_clone(seen_tees)),
3127                metadata: metadata.clone(),
3128            },
3129            HydroNode::Fold {
3130                init,
3131                acc,
3132                input,
3133                metadata,
3134            } => HydroNode::Fold {
3135                init: init.deep_clone(seen_tees),
3136                acc: acc.deep_clone(seen_tees),
3137                input: Box::new(input.deep_clone(seen_tees)),
3138                metadata: metadata.clone(),
3139            },
3140            HydroNode::Scan {
3141                init,
3142                acc,
3143                input,
3144                metadata,
3145            } => HydroNode::Scan {
3146                init: init.deep_clone(seen_tees),
3147                acc: acc.deep_clone(seen_tees),
3148                input: Box::new(input.deep_clone(seen_tees)),
3149                metadata: metadata.clone(),
3150            },
3151            HydroNode::ScanAsyncBlocking {
3152                init,
3153                acc,
3154                input,
3155                metadata,
3156            } => HydroNode::ScanAsyncBlocking {
3157                init: init.deep_clone(seen_tees),
3158                acc: acc.deep_clone(seen_tees),
3159                input: Box::new(input.deep_clone(seen_tees)),
3160                metadata: metadata.clone(),
3161            },
3162            HydroNode::FoldKeyed {
3163                init,
3164                acc,
3165                input,
3166                metadata,
3167            } => HydroNode::FoldKeyed {
3168                init: init.deep_clone(seen_tees),
3169                acc: acc.deep_clone(seen_tees),
3170                input: Box::new(input.deep_clone(seen_tees)),
3171                metadata: metadata.clone(),
3172            },
3173            HydroNode::ReduceKeyedWatermark {
3174                f,
3175                input,
3176                watermark,
3177                metadata,
3178            } => HydroNode::ReduceKeyedWatermark {
3179                f: f.deep_clone(seen_tees),
3180                input: Box::new(input.deep_clone(seen_tees)),
3181                watermark: Box::new(watermark.deep_clone(seen_tees)),
3182                metadata: metadata.clone(),
3183            },
3184            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3185                f: f.deep_clone(seen_tees),
3186                input: Box::new(input.deep_clone(seen_tees)),
3187                metadata: metadata.clone(),
3188            },
3189            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3190                f: f.deep_clone(seen_tees),
3191                input: Box::new(input.deep_clone(seen_tees)),
3192                metadata: metadata.clone(),
3193            },
3194            HydroNode::Network {
3195                name,
3196                networking_info,
3197                serialize_fn,
3198                instantiate_fn,
3199                deserialize_fn,
3200                input,
3201                metadata,
3202            } => HydroNode::Network {
3203                name: name.clone(),
3204                networking_info: networking_info.clone(),
3205                serialize_fn: serialize_fn.clone(),
3206                instantiate_fn: instantiate_fn.clone(),
3207                deserialize_fn: deserialize_fn.clone(),
3208                input: Box::new(input.deep_clone(seen_tees)),
3209                metadata: metadata.clone(),
3210            },
3211            HydroNode::ExternalInput {
3212                from_external_key,
3213                from_port_id,
3214                from_many,
3215                codec_type,
3216                port_hint,
3217                instantiate_fn,
3218                deserialize_fn,
3219                metadata,
3220            } => HydroNode::ExternalInput {
3221                from_external_key: *from_external_key,
3222                from_port_id: *from_port_id,
3223                from_many: *from_many,
3224                codec_type: codec_type.clone(),
3225                port_hint: *port_hint,
3226                instantiate_fn: instantiate_fn.clone(),
3227                deserialize_fn: deserialize_fn.clone(),
3228                metadata: metadata.clone(),
3229            },
3230            HydroNode::Counter {
3231                tag,
3232                duration,
3233                prefix,
3234                input,
3235                metadata,
3236            } => HydroNode::Counter {
3237                tag: tag.clone(),
3238                duration: duration.clone(),
3239                prefix: prefix.clone(),
3240                input: Box::new(input.deep_clone(seen_tees)),
3241                metadata: metadata.clone(),
3242            },
3243        }
3244    }
3245
3246    #[cfg(feature = "build")]
3247    pub fn emit_core(
3248        &mut self,
3249        builders_or_callback: &mut BuildersOrCallback<
3250            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3251            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3252        >,
3253        seen_tees: &mut SeenSharedNodes,
3254        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3255        next_stmt_id: &mut crate::Counter<StmtId>,
3256        fold_hooked_idents: &mut HashSet<String>,
3257    ) -> syn::Ident {
3258        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3259
3260        self.transform_bottom_up(
3261            &mut |node: &mut HydroNode| {
3262                let out_location = node.metadata().location_id.clone();
3263                match node {
3264                    HydroNode::Placeholder => {
3265                        panic!()
3266                    }
3267
3268                    HydroNode::Cast { .. } => {
3269                        // Cast passes through the input ident unchanged
3270                        // The input ident is already on the stack from processing the child
3271                        let _ = next_stmt_id.get_and_increment();
3272                        match builders_or_callback {
3273                            BuildersOrCallback::Builders(_) => {}
3274                            BuildersOrCallback::Callback(_, node_callback) => {
3275                                node_callback(node, next_stmt_id);
3276                            }
3277                        }
3278                        // input_ident stays on stack as output
3279                    }
3280
3281                    HydroNode::UnboundSingleton { .. } => {
3282                        let inner_ident = ident_stack.pop().unwrap();
3283
3284                        let stmt_id = next_stmt_id.get_and_increment();
3285                        let out_ident =
3286                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3287
3288                        match builders_or_callback {
3289                            BuildersOrCallback::Builders(graph_builders) => {
3290                                if graph_builders.singleton_intermediates() {
3291                                    let builder = graph_builders.get_dfir_mut(&out_location);
3292                                    builder.add_dfir(
3293                                        parse_quote! {
3294                                            #out_ident = #inner_ident;
3295                                        },
3296                                        None,
3297                                        None,
3298                                    );
3299                                } else {
3300                                    let builder = graph_builders.get_dfir_mut(&out_location);
3301                                    builder.add_dfir(
3302                                        parse_quote! {
3303                                            #out_ident = #inner_ident -> persist::<'static>();
3304                                        },
3305                                        None,
3306                                        None,
3307                                    );
3308                                }
3309                            }
3310                            BuildersOrCallback::Callback(_, node_callback) => {
3311                                node_callback(node, next_stmt_id);
3312                            }
3313                        }
3314
3315                        ident_stack.push(out_ident);
3316                    }
3317
3318                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3319                        let inner_ident = ident_stack.pop().unwrap();
3320
3321                        let stmt_id = next_stmt_id.get_and_increment();
3322                        let out_ident =
3323                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3324
3325                        match builders_or_callback {
3326                            BuildersOrCallback::Builders(graph_builders) => {
3327                                graph_builders.assert_is_consistent(
3328                                    *trusted,
3329                                    &inner.metadata().location_id,
3330                                    inner_ident,
3331                                    &out_ident,
3332                                );
3333                            }
3334                            BuildersOrCallback::Callback(_, node_callback) => {
3335                                node_callback(node, next_stmt_id);
3336                            }
3337                        }
3338
3339                        ident_stack.push(out_ident);
3340                    }
3341
3342                    HydroNode::ObserveNonDet {
3343                        inner,
3344                        trusted,
3345                        metadata,
3346                        ..
3347                    } => {
3348                        let inner_ident = ident_stack.pop().unwrap();
3349
3350                        let stmt_id = next_stmt_id.get_and_increment();
3351                        let observe_ident =
3352                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3353
3354                        match builders_or_callback {
3355                            BuildersOrCallback::Builders(graph_builders) => {
3356                                graph_builders.observe_nondet(
3357                                    *trusted,
3358                                    &inner.metadata().location_id,
3359                                    inner_ident,
3360                                    &inner.metadata().collection_kind,
3361                                    &observe_ident,
3362                                    &metadata.collection_kind,
3363                                    &metadata.op,
3364                                );
3365                            }
3366                            BuildersOrCallback::Callback(_, node_callback) => {
3367                                node_callback(node, next_stmt_id);
3368                            }
3369                        }
3370
3371                        ident_stack.push(observe_ident);
3372                    }
3373
3374                    HydroNode::Batch {
3375                        inner, metadata, ..
3376                    } => {
3377                        let inner_ident = ident_stack.pop().unwrap();
3378
3379                        let stmt_id = next_stmt_id.get_and_increment();
3380                        let batch_ident =
3381                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3382
3383                        match builders_or_callback {
3384                            BuildersOrCallback::Builders(graph_builders) => {
3385                                graph_builders.batch(
3386                                    inner_ident,
3387                                    &inner.metadata().location_id,
3388                                    &inner.metadata().collection_kind,
3389                                    &batch_ident,
3390                                    &out_location,
3391                                    &metadata.op,
3392                                    fold_hooked_idents,
3393                                );
3394                            }
3395                            BuildersOrCallback::Callback(_, node_callback) => {
3396                                node_callback(node, next_stmt_id);
3397                            }
3398                        }
3399
3400                        ident_stack.push(batch_ident);
3401                    }
3402
3403                    HydroNode::YieldConcat { inner, .. } => {
3404                        let inner_ident = ident_stack.pop().unwrap();
3405
3406                        let stmt_id = next_stmt_id.get_and_increment();
3407                        let yield_ident =
3408                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3409
3410                        match builders_or_callback {
3411                            BuildersOrCallback::Builders(graph_builders) => {
3412                                graph_builders.yield_from_tick(
3413                                    inner_ident,
3414                                    &inner.metadata().location_id,
3415                                    &inner.metadata().collection_kind,
3416                                    &yield_ident,
3417                                    &out_location,
3418                                );
3419                            }
3420                            BuildersOrCallback::Callback(_, node_callback) => {
3421                                node_callback(node, next_stmt_id);
3422                            }
3423                        }
3424
3425                        ident_stack.push(yield_ident);
3426                    }
3427
3428                    HydroNode::BeginAtomic { inner, metadata } => {
3429                        let inner_ident = ident_stack.pop().unwrap();
3430
3431                        let stmt_id = next_stmt_id.get_and_increment();
3432                        let begin_ident =
3433                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3434
3435                        match builders_or_callback {
3436                            BuildersOrCallback::Builders(graph_builders) => {
3437                                graph_builders.begin_atomic(
3438                                    inner_ident,
3439                                    &inner.metadata().location_id,
3440                                    &inner.metadata().collection_kind,
3441                                    &begin_ident,
3442                                    &out_location,
3443                                    &metadata.op,
3444                                );
3445                            }
3446                            BuildersOrCallback::Callback(_, node_callback) => {
3447                                node_callback(node, next_stmt_id);
3448                            }
3449                        }
3450
3451                        ident_stack.push(begin_ident);
3452                    }
3453
3454                    HydroNode::EndAtomic { inner, .. } => {
3455                        let inner_ident = ident_stack.pop().unwrap();
3456
3457                        let stmt_id = next_stmt_id.get_and_increment();
3458                        let end_ident =
3459                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3460
3461                        match builders_or_callback {
3462                            BuildersOrCallback::Builders(graph_builders) => {
3463                                graph_builders.end_atomic(
3464                                    inner_ident,
3465                                    &inner.metadata().location_id,
3466                                    &inner.metadata().collection_kind,
3467                                    &end_ident,
3468                                );
3469                            }
3470                            BuildersOrCallback::Callback(_, node_callback) => {
3471                                node_callback(node, next_stmt_id);
3472                            }
3473                        }
3474
3475                        ident_stack.push(end_ident);
3476                    }
3477
3478                    HydroNode::Source {
3479                        source, metadata, ..
3480                    } => {
3481                        if let HydroSource::ExternalNetwork() = source {
3482                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3483                        } else {
3484                            let stmt_id = next_stmt_id.get_and_increment();
3485                            let source_ident =
3486                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3487
3488                            let source_stmt = match source {
3489                                HydroSource::Stream(expr) => {
3490                                    debug_assert!(metadata.location_id.is_top_level());
3491                                    parse_quote! {
3492                                        #source_ident = source_stream(#expr);
3493                                    }
3494                                }
3495
3496                                HydroSource::ExternalNetwork() => {
3497                                    unreachable!()
3498                                }
3499
3500                                HydroSource::Iter(expr) => {
3501                                    if metadata.location_id.is_top_level() {
3502                                        parse_quote! {
3503                                            #source_ident = source_iter(#expr);
3504                                        }
3505                                    } else {
3506                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3507                                        parse_quote! {
3508                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3509                                        }
3510                                    }
3511                                }
3512
3513                                HydroSource::Spin() => {
3514                                    debug_assert!(metadata.location_id.is_top_level());
3515                                    parse_quote! {
3516                                        #source_ident = spin();
3517                                    }
3518                                }
3519
3520                                HydroSource::ClusterMembers(target_loc, state) => {
3521                                    debug_assert!(metadata.location_id.is_top_level());
3522
3523                                    let members_tee_ident = syn::Ident::new(
3524                                        &format!(
3525                                            "__cluster_members_tee_{}_{}",
3526                                            metadata.location_id.root().key(),
3527                                            target_loc.key(),
3528                                        ),
3529                                        Span::call_site(),
3530                                    );
3531
3532                                    match state {
3533                                        ClusterMembersState::Stream(d) => {
3534                                            parse_quote! {
3535                                                #members_tee_ident = source_stream(#d) -> tee();
3536                                                #source_ident = #members_tee_ident;
3537                                            }
3538                                        },
3539                                        ClusterMembersState::Uninit => syn::parse_quote! {
3540                                            #source_ident = source_stream(DUMMY);
3541                                        },
3542                                        ClusterMembersState::Tee(..) => parse_quote! {
3543                                            #source_ident = #members_tee_ident;
3544                                        },
3545                                    }
3546                                }
3547
3548                                HydroSource::Embedded(ident) => {
3549                                    parse_quote! {
3550                                        #source_ident = source_stream(#ident);
3551                                    }
3552                                }
3553
3554                                HydroSource::EmbeddedSingleton(ident) => {
3555                                    parse_quote! {
3556                                        #source_ident = source_iter([#ident]);
3557                                    }
3558                                }
3559                            };
3560
3561                            match builders_or_callback {
3562                                BuildersOrCallback::Builders(graph_builders) => {
3563                                    let builder = graph_builders.get_dfir_mut(&out_location);
3564                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3565                                }
3566                                BuildersOrCallback::Callback(_, node_callback) => {
3567                                    node_callback(node, next_stmt_id);
3568                                }
3569                            }
3570
3571                            ident_stack.push(source_ident);
3572                        }
3573                    }
3574
3575                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3576                        let stmt_id = next_stmt_id.get_and_increment();
3577                        let source_ident =
3578                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3579
3580                        match builders_or_callback {
3581                            BuildersOrCallback::Builders(graph_builders) => {
3582                                let builder = graph_builders.get_dfir_mut(&out_location);
3583
3584                                if *first_tick_only {
3585                                    assert!(
3586                                        !metadata.location_id.is_top_level(),
3587                                        "first_tick_only SingletonSource must be inside a tick"
3588                                    );
3589                                }
3590
3591                                if *first_tick_only
3592                                    || (metadata.location_id.is_top_level()
3593                                        && metadata.collection_kind.is_bounded())
3594                                {
3595                                    builder.add_dfir(
3596                                        parse_quote! {
3597                                            #source_ident = source_iter([#value]);
3598                                        },
3599                                        None,
3600                                        Some(&stmt_id.to_string()),
3601                                    );
3602                                } else {
3603                                    builder.add_dfir(
3604                                        parse_quote! {
3605                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3606                                        },
3607                                        None,
3608                                        Some(&stmt_id.to_string()),
3609                                    );
3610                                }
3611                            }
3612                            BuildersOrCallback::Callback(_, node_callback) => {
3613                                node_callback(node, next_stmt_id);
3614                            }
3615                        }
3616
3617                        ident_stack.push(source_ident);
3618                    }
3619
3620                    HydroNode::CycleSource { cycle_id, .. } => {
3621                        let ident = cycle_id.as_ident();
3622
3623                        // consume a stmt id even though we did not emit anything so that we can instrument this
3624                        let _ = next_stmt_id.get_and_increment();
3625
3626                        match builders_or_callback {
3627                            BuildersOrCallback::Builders(_) => {}
3628                            BuildersOrCallback::Callback(_, node_callback) => {
3629                                node_callback(node, next_stmt_id);
3630                            }
3631                        }
3632
3633                        ident_stack.push(ident);
3634                    }
3635
3636                    HydroNode::Tee { inner, .. } => {
3637                        // we consume a stmt id regardless of if we emit the tee() operator,
3638                        // so that during rewrites we touch all recipients of the tee()
3639                        let stmt_id = next_stmt_id.get_and_increment();
3640
3641                        let ret_ident = if let Some(built_idents) =
3642                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3643                        {
3644                            match builders_or_callback {
3645                                BuildersOrCallback::Builders(_) => {}
3646                                BuildersOrCallback::Callback(_, node_callback) => {
3647                                    node_callback(node, next_stmt_id);
3648                                }
3649                            }
3650
3651                            built_idents[0].clone()
3652                        } else {
3653                            // The inner node was already processed by transform_bottom_up,
3654                            // so its ident is on the stack
3655                            let inner_ident = ident_stack.pop().unwrap();
3656
3657                            let tee_ident =
3658                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3659
3660                            built_tees.insert(
3661                                inner.0.as_ref() as *const RefCell<HydroNode>,
3662                                vec![tee_ident.clone()],
3663                            );
3664
3665                            match builders_or_callback {
3666                                BuildersOrCallback::Builders(graph_builders) => {
3667                                    // NOTE: With `forward_ref`, the fold codegen may not have
3668                                    // run yet when we reach this tee, so `fold_hooked_idents`
3669                                    // might not contain the inner ident. In that case we won't
3670                                    // propagate the "hooked" status to the tee and the
3671                                    // downstream singleton batch will use the normal
3672                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3673                                    // This is not a soundness issue: the fallback hook still
3674                                    // produces correct behavior, just with a redundant decision
3675                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3676                                    // fix ordering so forward_ref folds are always processed
3677                                    // before their downstream tees.
3678                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3679                                        fold_hooked_idents.insert(tee_ident.to_string());
3680                                    }
3681                                    let builder = graph_builders.get_dfir_mut(&out_location);
3682                                    builder.add_dfir(
3683                                        parse_quote! {
3684                                            #tee_ident = #inner_ident -> tee();
3685                                        },
3686                                        None,
3687                                        Some(&stmt_id.to_string()),
3688                                    );
3689                                }
3690                                BuildersOrCallback::Callback(_, node_callback) => {
3691                                    node_callback(node, next_stmt_id);
3692                                }
3693                            }
3694
3695                            tee_ident
3696                        };
3697
3698                        ident_stack.push(ret_ident);
3699                    }
3700
3701                    HydroNode::Reference { inner, kind, .. } => {
3702                        // we consume a stmt id regardless of if we emit the operator,
3703                        // so that during rewrites we touch all recipients
3704                        let stmt_id = next_stmt_id.get_and_increment();
3705
3706                        let ret_ident = if let Some(built_idents) =
3707                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3708                        {
3709                            built_idents[0].clone()
3710                        } else {
3711                            let inner_ident = ident_stack.pop().unwrap();
3712
3713                            let ref_ident =
3714                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3715
3716                            built_tees.insert(
3717                                inner.0.as_ref() as *const RefCell<HydroNode>,
3718                                vec![ref_ident.clone()],
3719                            );
3720
3721                            match builders_or_callback {
3722                                BuildersOrCallback::Builders(graph_builders) => {
3723                                    let builder = graph_builders.get_dfir_mut(&out_location);
3724                                    let op_ident = syn::Ident::new(
3725                                        match kind {
3726                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3727                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3728                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3729                                        },
3730                                        Span::call_site(),
3731                                    );
3732                                    builder.add_dfir(
3733                                        parse_quote! {
3734                                            #ref_ident = #inner_ident -> #op_ident();
3735                                        },
3736                                        None,
3737                                        Some(&stmt_id.to_string()),
3738                                    );
3739                                }
3740                                BuildersOrCallback::Callback(_, node_callback) => {
3741                                    node_callback(node, next_stmt_id);
3742                                }
3743                            }
3744
3745                            ref_ident
3746                        };
3747
3748                        ident_stack.push(ret_ident);
3749                    }
3750
3751                    HydroNode::Partition {
3752                        inner, f, is_true, metadata,
3753                    } => {
3754                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3755                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3756                        let stmt_id = next_stmt_id.get_and_increment();
3757
3758                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3759                            match builders_or_callback {
3760                                BuildersOrCallback::Builders(_) => {}
3761                                BuildersOrCallback::Callback(_, node_callback) => {
3762                                    node_callback(node, next_stmt_id);
3763                                }
3764                            }
3765
3766                            let idx = if is_true { 0 } else { 1 };
3767                            built_idents[idx].clone()
3768                        } else {
3769                            // The inner node was already processed by transform_bottom_up,
3770                            // so its ident is on the stack
3771                            let inner_ident = ident_stack.pop().unwrap();
3772                            let f_tokens = f.emit_tokens(&mut ident_stack);
3773
3774                            let inner_ident = {
3775                                let inner_borrow = inner.0.borrow();
3776                                maybe_observe_for_mut(
3777                                    f, inner_ident,
3778                                    &inner_borrow.metadata().location_id,
3779                                    &inner_borrow.metadata().collection_kind,
3780                                    &metadata.op,
3781                                    builders_or_callback, next_stmt_id,
3782                                )
3783                            };
3784
3785                            let partition_ident = syn::Ident::new(
3786                                &format!("stream_{}_partition", stmt_id),
3787                                Span::call_site(),
3788                            );
3789                            let true_ident = syn::Ident::new(
3790                                &format!("stream_{}_true", stmt_id),
3791                                Span::call_site(),
3792                            );
3793                            let false_ident = syn::Ident::new(
3794                                &format!("stream_{}_false", stmt_id),
3795                                Span::call_site(),
3796                            );
3797
3798                            built_tees.insert(
3799                                ptr,
3800                                vec![true_ident.clone(), false_ident.clone()],
3801                            );
3802
3803                            let stmt_id = next_stmt_id.get_and_increment();
3804                            match builders_or_callback {
3805                                BuildersOrCallback::Builders(graph_builders) => {
3806                                    let builder = graph_builders.get_dfir_mut(&out_location);
3807                                    builder.add_dfir(
3808                                        parse_quote! {
3809                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3810                                            #true_ident = #partition_ident[0];
3811                                            #false_ident = #partition_ident[1];
3812                                        },
3813                                        None,
3814                                        Some(&stmt_id.to_string()),
3815                                    );
3816                                }
3817                                BuildersOrCallback::Callback(_, node_callback) => {
3818                                    node_callback(node, next_stmt_id);
3819                                }
3820                            }
3821
3822                            if is_true { true_ident } else { false_ident }
3823                        };
3824
3825                        ident_stack.push(ret_ident);
3826                    }
3827
3828                    HydroNode::Chain { .. } => {
3829                        // Children are processed left-to-right, so second is on top
3830                        let second_ident = ident_stack.pop().unwrap();
3831                        let first_ident = ident_stack.pop().unwrap();
3832
3833                        let stmt_id = next_stmt_id.get_and_increment();
3834                        let chain_ident =
3835                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3836
3837                        match builders_or_callback {
3838                            BuildersOrCallback::Builders(graph_builders) => {
3839                                let builder = graph_builders.get_dfir_mut(&out_location);
3840                                builder.add_dfir(
3841                                    parse_quote! {
3842                                        #chain_ident = chain();
3843                                        #first_ident -> [0]#chain_ident;
3844                                        #second_ident -> [1]#chain_ident;
3845                                    },
3846                                    None,
3847                                    Some(&stmt_id.to_string()),
3848                                );
3849                            }
3850                            BuildersOrCallback::Callback(_, node_callback) => {
3851                                node_callback(node, next_stmt_id);
3852                            }
3853                        }
3854
3855                        ident_stack.push(chain_ident);
3856                    }
3857
3858                    HydroNode::MergeOrdered { first, metadata, .. } => {
3859                        let second_ident = ident_stack.pop().unwrap();
3860                        let first_ident = ident_stack.pop().unwrap();
3861
3862                        let stmt_id = next_stmt_id.get_and_increment();
3863                        let merge_ident =
3864                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3865
3866                        match builders_or_callback {
3867                            BuildersOrCallback::Builders(graph_builders) => {
3868                                graph_builders.merge_ordered(
3869                                    &first.metadata().location_id,
3870                                    first_ident,
3871                                    second_ident,
3872                                    &merge_ident,
3873                                    &first.metadata().collection_kind,
3874                                    &metadata.op,
3875                                    Some(&stmt_id.to_string()),
3876                                );
3877                            }
3878                            BuildersOrCallback::Callback(_, node_callback) => {
3879                                node_callback(node, next_stmt_id);
3880                            }
3881                        }
3882
3883                        ident_stack.push(merge_ident);
3884                    }
3885
3886                    HydroNode::ChainFirst { .. } => {
3887                        let second_ident = ident_stack.pop().unwrap();
3888                        let first_ident = ident_stack.pop().unwrap();
3889
3890                        let stmt_id = next_stmt_id.get_and_increment();
3891                        let chain_ident =
3892                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3893
3894                        match builders_or_callback {
3895                            BuildersOrCallback::Builders(graph_builders) => {
3896                                let builder = graph_builders.get_dfir_mut(&out_location);
3897                                builder.add_dfir(
3898                                    parse_quote! {
3899                                        #chain_ident = chain_first_n(1);
3900                                        #first_ident -> [0]#chain_ident;
3901                                        #second_ident -> [1]#chain_ident;
3902                                    },
3903                                    None,
3904                                    Some(&stmt_id.to_string()),
3905                                );
3906                            }
3907                            BuildersOrCallback::Callback(_, node_callback) => {
3908                                node_callback(node, next_stmt_id);
3909                            }
3910                        }
3911
3912                        ident_stack.push(chain_ident);
3913                    }
3914
3915                    HydroNode::CrossSingleton { right, .. } => {
3916                        let right_ident = ident_stack.pop().unwrap();
3917                        let left_ident = ident_stack.pop().unwrap();
3918
3919                        let stmt_id = next_stmt_id.get_and_increment();
3920                        let cross_ident =
3921                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3922
3923                        match builders_or_callback {
3924                            BuildersOrCallback::Builders(graph_builders) => {
3925                                let builder = graph_builders.get_dfir_mut(&out_location);
3926
3927                                if right.metadata().location_id.is_top_level()
3928                                    && right.metadata().collection_kind.is_bounded()
3929                                {
3930                                    builder.add_dfir(
3931                                        parse_quote! {
3932                                            #cross_ident = cross_singleton::<'static>();
3933                                            #left_ident -> [input]#cross_ident;
3934                                            #right_ident -> [single]#cross_ident;
3935                                        },
3936                                        None,
3937                                        Some(&stmt_id.to_string()),
3938                                    );
3939                                } else {
3940                                    builder.add_dfir(
3941                                        parse_quote! {
3942                                            #cross_ident = cross_singleton();
3943                                            #left_ident -> [input]#cross_ident;
3944                                            #right_ident -> [single]#cross_ident;
3945                                        },
3946                                        None,
3947                                        Some(&stmt_id.to_string()),
3948                                    );
3949                                }
3950                            }
3951                            BuildersOrCallback::Callback(_, node_callback) => {
3952                                node_callback(node, next_stmt_id);
3953                            }
3954                        }
3955
3956                        ident_stack.push(cross_ident);
3957                    }
3958
3959                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3960                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3961                            parse_quote!(cross_join_multiset)
3962                        } else {
3963                            parse_quote!(join_multiset)
3964                        };
3965
3966                        let (HydroNode::CrossProduct { left, right, .. }
3967                        | HydroNode::Join { left, right, .. }) = node
3968                        else {
3969                            unreachable!()
3970                        };
3971
3972                        let is_top_level = left.metadata().location_id.is_top_level()
3973                            && right.metadata().location_id.is_top_level();
3974                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3975                            quote!('static)
3976                        } else {
3977                            quote!('tick)
3978                        };
3979
3980                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3981                            quote!('static)
3982                        } else {
3983                            quote!('tick)
3984                        };
3985
3986                        let right_ident = ident_stack.pop().unwrap();
3987                        let left_ident = ident_stack.pop().unwrap();
3988
3989                        let stmt_id = next_stmt_id.get_and_increment();
3990                        let stream_ident =
3991                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3992
3993                        match builders_or_callback {
3994                            BuildersOrCallback::Builders(graph_builders) => {
3995                                let builder = graph_builders.get_dfir_mut(&out_location);
3996                                builder.add_dfir(
3997                                    if is_top_level {
3998                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3999                                        // a multiset_delta() to negate the replay behavior
4000                                        parse_quote! {
4001                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4002                                            #left_ident -> [0]#stream_ident;
4003                                            #right_ident -> [1]#stream_ident;
4004                                        }
4005                                    } else {
4006                                        parse_quote! {
4007                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4008                                            #left_ident -> [0]#stream_ident;
4009                                            #right_ident -> [1]#stream_ident;
4010                                        }
4011                                    }
4012                                    ,
4013                                    None,
4014                                    Some(&stmt_id.to_string()),
4015                                );
4016                            }
4017                            BuildersOrCallback::Callback(_, node_callback) => {
4018                                node_callback(node, next_stmt_id);
4019                            }
4020                        }
4021
4022                        ident_stack.push(stream_ident);
4023                    }
4024
4025                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4026                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4027                            parse_quote!(difference)
4028                        } else {
4029                            parse_quote!(anti_join)
4030                        };
4031
4032                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4033                            node
4034                        else {
4035                            unreachable!()
4036                        };
4037
4038                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4039                            quote!('static)
4040                        } else {
4041                            quote!('tick)
4042                        };
4043
4044                        let neg_ident = ident_stack.pop().unwrap();
4045                        let pos_ident = ident_stack.pop().unwrap();
4046
4047                        let stmt_id = next_stmt_id.get_and_increment();
4048                        let stream_ident =
4049                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4050
4051                        match builders_or_callback {
4052                            BuildersOrCallback::Builders(graph_builders) => {
4053                                let builder = graph_builders.get_dfir_mut(&out_location);
4054                                builder.add_dfir(
4055                                    parse_quote! {
4056                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
4057                                        #pos_ident -> [pos]#stream_ident;
4058                                        #neg_ident -> [neg]#stream_ident;
4059                                    },
4060                                    None,
4061                                    Some(&stmt_id.to_string()),
4062                                );
4063                            }
4064                            BuildersOrCallback::Callback(_, node_callback) => {
4065                                node_callback(node, next_stmt_id);
4066                            }
4067                        }
4068
4069                        ident_stack.push(stream_ident);
4070                    }
4071
4072                    HydroNode::JoinHalf { .. } => {
4073                        let HydroNode::JoinHalf { right, .. } = node else {
4074                            unreachable!()
4075                        };
4076
4077                        assert!(
4078                            right.metadata().collection_kind.is_bounded(),
4079                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4080                            right.metadata().collection_kind
4081                        );
4082
4083                        let build_lifetime = if right.metadata().location_id.is_top_level() {
4084                            quote!('static)
4085                        } else {
4086                            quote!('tick)
4087                        };
4088
4089                        let build_ident = ident_stack.pop().unwrap();
4090                        let probe_ident = ident_stack.pop().unwrap();
4091
4092                        let stmt_id = next_stmt_id.get_and_increment();
4093                        let stream_ident =
4094                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4095
4096                        match builders_or_callback {
4097                            BuildersOrCallback::Builders(graph_builders) => {
4098                                let builder = graph_builders.get_dfir_mut(&out_location);
4099                                builder.add_dfir(
4100                                    parse_quote! {
4101                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4102                                        #probe_ident -> [probe]#stream_ident;
4103                                        #build_ident -> [build]#stream_ident;
4104                                    },
4105                                    None,
4106                                    Some(&stmt_id.to_string()),
4107                                );
4108                            }
4109                            BuildersOrCallback::Callback(_, node_callback) => {
4110                                node_callback(node, next_stmt_id);
4111                            }
4112                        }
4113
4114                        ident_stack.push(stream_ident);
4115                    }
4116
4117                    HydroNode::ResolveFutures { .. } => {
4118                        let input_ident = ident_stack.pop().unwrap();
4119
4120                        let stmt_id = next_stmt_id.get_and_increment();
4121                        let futures_ident =
4122                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4123
4124                        match builders_or_callback {
4125                            BuildersOrCallback::Builders(graph_builders) => {
4126                                let builder = graph_builders.get_dfir_mut(&out_location);
4127                                builder.add_dfir(
4128                                    parse_quote! {
4129                                        #futures_ident = #input_ident -> resolve_futures();
4130                                    },
4131                                    None,
4132                                    Some(&stmt_id.to_string()),
4133                                );
4134                            }
4135                            BuildersOrCallback::Callback(_, node_callback) => {
4136                                node_callback(node, next_stmt_id);
4137                            }
4138                        }
4139
4140                        ident_stack.push(futures_ident);
4141                    }
4142
4143                    HydroNode::ResolveFuturesBlocking { .. } => {
4144                        let input_ident = ident_stack.pop().unwrap();
4145
4146                        let stmt_id = next_stmt_id.get_and_increment();
4147                        let futures_ident =
4148                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4149
4150                        match builders_or_callback {
4151                            BuildersOrCallback::Builders(graph_builders) => {
4152                                let builder = graph_builders.get_dfir_mut(&out_location);
4153                                builder.add_dfir(
4154                                    parse_quote! {
4155                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4156                                    },
4157                                    None,
4158                                    Some(&stmt_id.to_string()),
4159                                );
4160                            }
4161                            BuildersOrCallback::Callback(_, node_callback) => {
4162                                node_callback(node, next_stmt_id);
4163                            }
4164                        }
4165
4166                        ident_stack.push(futures_ident);
4167                    }
4168
4169                    HydroNode::ResolveFuturesOrdered { .. } => {
4170                        let input_ident = ident_stack.pop().unwrap();
4171
4172                        let stmt_id = next_stmt_id.get_and_increment();
4173                        let futures_ident =
4174                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4175
4176                        match builders_or_callback {
4177                            BuildersOrCallback::Builders(graph_builders) => {
4178                                let builder = graph_builders.get_dfir_mut(&out_location);
4179                                builder.add_dfir(
4180                                    parse_quote! {
4181                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4182                                    },
4183                                    None,
4184                                    Some(&stmt_id.to_string()),
4185                                );
4186                            }
4187                            BuildersOrCallback::Callback(_, node_callback) => {
4188                                node_callback(node, next_stmt_id);
4189                            }
4190                        }
4191
4192                        ident_stack.push(futures_ident);
4193                    }
4194
4195                    HydroNode::Map {
4196                        f,
4197                        input,
4198                        metadata,
4199                    } => {
4200                        // Pop input ident (pushed last by transform_children).
4201                        let input_ident = ident_stack.pop().unwrap();
4202                        let f_tokens = f.emit_tokens(&mut ident_stack);
4203
4204                        let input_ident = maybe_observe_for_mut(
4205                            f,
4206                            input_ident,
4207                            &input.metadata().location_id,
4208                            &input.metadata().collection_kind,
4209                            &metadata.op,
4210                            builders_or_callback,
4211                            next_stmt_id,
4212                        );
4213
4214                        let stmt_id = next_stmt_id.get_and_increment();
4215                        let map_ident =
4216                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4217
4218                        match builders_or_callback {
4219                            BuildersOrCallback::Builders(graph_builders) => {
4220                                let builder = graph_builders.get_dfir_mut(&out_location);
4221                                builder.add_dfir(
4222                                    parse_quote! {
4223                                        #map_ident = #input_ident -> map(#f_tokens);
4224                                    },
4225                                    None,
4226                                    Some(&stmt_id.to_string()),
4227                                );
4228                            }
4229                            BuildersOrCallback::Callback(_, node_callback) => {
4230                                node_callback(node, next_stmt_id);
4231                            }
4232                        }
4233
4234                        ident_stack.push(map_ident);
4235                    }
4236
4237                    HydroNode::FlatMap { f, input, metadata } => {
4238                        let input_ident = ident_stack.pop().unwrap();
4239                        let f_tokens = f.emit_tokens(&mut ident_stack);
4240
4241                        let input_ident = maybe_observe_for_mut(
4242                            f, input_ident,
4243                            &input.metadata().location_id,
4244                            &input.metadata().collection_kind,
4245                            &metadata.op,
4246                            builders_or_callback, next_stmt_id,
4247                        );
4248
4249                        let stmt_id = next_stmt_id.get_and_increment();
4250                        let flat_map_ident =
4251                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4252
4253                        match builders_or_callback {
4254                            BuildersOrCallback::Builders(graph_builders) => {
4255                                let builder = graph_builders.get_dfir_mut(&out_location);
4256                                builder.add_dfir(
4257                                    parse_quote! {
4258                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4259                                    },
4260                                    None,
4261                                    Some(&stmt_id.to_string()),
4262                                );
4263                            }
4264                            BuildersOrCallback::Callback(_, node_callback) => {
4265                                node_callback(node, next_stmt_id);
4266                            }
4267                        }
4268
4269                        ident_stack.push(flat_map_ident);
4270                    }
4271
4272                    HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4273                        let input_ident = ident_stack.pop().unwrap();
4274                        let f_tokens = f.emit_tokens(&mut ident_stack);
4275
4276                        let input_ident = maybe_observe_for_mut(
4277                            f, input_ident,
4278                            &input.metadata().location_id,
4279                            &input.metadata().collection_kind,
4280                            &metadata.op,
4281                            builders_or_callback, next_stmt_id,
4282                        );
4283
4284                        let stmt_id = next_stmt_id.get_and_increment();
4285                        let flat_map_stream_blocking_ident =
4286                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4287
4288                        match builders_or_callback {
4289                            BuildersOrCallback::Builders(graph_builders) => {
4290                                let builder = graph_builders.get_dfir_mut(&out_location);
4291                                builder.add_dfir(
4292                                    parse_quote! {
4293                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4294                                    },
4295                                    None,
4296                                    Some(&stmt_id.to_string()),
4297                                );
4298                            }
4299                            BuildersOrCallback::Callback(_, node_callback) => {
4300                                node_callback(node, next_stmt_id);
4301                            }
4302                        }
4303
4304                        ident_stack.push(flat_map_stream_blocking_ident);
4305                    }
4306
4307                    HydroNode::Filter { f, input, metadata } => {
4308                        let input_ident = ident_stack.pop().unwrap();
4309                        let f_tokens = f.emit_tokens(&mut ident_stack);
4310
4311                        let input_ident = maybe_observe_for_mut(
4312                            f, input_ident,
4313                            &input.metadata().location_id,
4314                            &input.metadata().collection_kind,
4315                            &metadata.op,
4316                            builders_or_callback, next_stmt_id,
4317                        );
4318
4319                        let stmt_id = next_stmt_id.get_and_increment();
4320                        let filter_ident =
4321                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4322
4323                        match builders_or_callback {
4324                            BuildersOrCallback::Builders(graph_builders) => {
4325                                let builder = graph_builders.get_dfir_mut(&out_location);
4326                                builder.add_dfir(
4327                                    parse_quote! {
4328                                        #filter_ident = #input_ident -> filter(#f_tokens);
4329                                    },
4330                                    None,
4331                                    Some(&stmt_id.to_string()),
4332                                );
4333                            }
4334                            BuildersOrCallback::Callback(_, node_callback) => {
4335                                node_callback(node, next_stmt_id);
4336                            }
4337                        }
4338
4339                        ident_stack.push(filter_ident);
4340                    }
4341
4342                    HydroNode::FilterMap { f, input, metadata } => {
4343                        let input_ident = ident_stack.pop().unwrap();
4344                        let f_tokens = f.emit_tokens(&mut ident_stack);
4345
4346                        let input_ident = maybe_observe_for_mut(
4347                            f, input_ident,
4348                            &input.metadata().location_id,
4349                            &input.metadata().collection_kind,
4350                            &metadata.op,
4351                            builders_or_callback, next_stmt_id,
4352                        );
4353
4354                        let stmt_id = next_stmt_id.get_and_increment();
4355                        let filter_map_ident =
4356                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4357
4358                        match builders_or_callback {
4359                            BuildersOrCallback::Builders(graph_builders) => {
4360                                let builder = graph_builders.get_dfir_mut(&out_location);
4361                                builder.add_dfir(
4362                                    parse_quote! {
4363                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4364                                    },
4365                                    None,
4366                                    Some(&stmt_id.to_string()),
4367                                );
4368                            }
4369                            BuildersOrCallback::Callback(_, node_callback) => {
4370                                node_callback(node, next_stmt_id);
4371                            }
4372                        }
4373
4374                        ident_stack.push(filter_map_ident);
4375                    }
4376
4377                    HydroNode::Sort { .. } => {
4378                        let input_ident = ident_stack.pop().unwrap();
4379
4380                        let stmt_id = next_stmt_id.get_and_increment();
4381                        let sort_ident =
4382                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4383
4384                        match builders_or_callback {
4385                            BuildersOrCallback::Builders(graph_builders) => {
4386                                let builder = graph_builders.get_dfir_mut(&out_location);
4387                                builder.add_dfir(
4388                                    parse_quote! {
4389                                        #sort_ident = #input_ident -> sort();
4390                                    },
4391                                    None,
4392                                    Some(&stmt_id.to_string()),
4393                                );
4394                            }
4395                            BuildersOrCallback::Callback(_, node_callback) => {
4396                                node_callback(node, next_stmt_id);
4397                            }
4398                        }
4399
4400                        ident_stack.push(sort_ident);
4401                    }
4402
4403                    HydroNode::DeferTick { .. } => {
4404                        let input_ident = ident_stack.pop().unwrap();
4405
4406                        let stmt_id = next_stmt_id.get_and_increment();
4407                        let defer_tick_ident =
4408                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4409
4410                        match builders_or_callback {
4411                            BuildersOrCallback::Builders(graph_builders) => {
4412                                let builder = graph_builders.get_dfir_mut(&out_location);
4413                                builder.add_dfir(
4414                                    parse_quote! {
4415                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4416                                    },
4417                                    None,
4418                                    Some(&stmt_id.to_string()),
4419                                );
4420                            }
4421                            BuildersOrCallback::Callback(_, node_callback) => {
4422                                node_callback(node, next_stmt_id);
4423                            }
4424                        }
4425
4426                        ident_stack.push(defer_tick_ident);
4427                    }
4428
4429                    HydroNode::Enumerate { input, .. } => {
4430                        let input_ident = ident_stack.pop().unwrap();
4431
4432                        let stmt_id = next_stmt_id.get_and_increment();
4433                        let enumerate_ident =
4434                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4435
4436                        match builders_or_callback {
4437                            BuildersOrCallback::Builders(graph_builders) => {
4438                                let builder = graph_builders.get_dfir_mut(&out_location);
4439                                let lifetime = if input.metadata().location_id.is_top_level() {
4440                                    quote!('static)
4441                                } else {
4442                                    quote!('tick)
4443                                };
4444                                builder.add_dfir(
4445                                    parse_quote! {
4446                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4447                                    },
4448                                    None,
4449                                    Some(&stmt_id.to_string()),
4450                                );
4451                            }
4452                            BuildersOrCallback::Callback(_, node_callback) => {
4453                                node_callback(node, next_stmt_id);
4454                            }
4455                        }
4456
4457                        ident_stack.push(enumerate_ident);
4458                    }
4459
4460                    HydroNode::Inspect { f, input, metadata } => {
4461                        let input_ident = ident_stack.pop().unwrap();
4462                        let f_tokens = f.emit_tokens(&mut ident_stack);
4463
4464                        let input_ident = maybe_observe_for_mut(
4465                            f, input_ident,
4466                            &input.metadata().location_id,
4467                            &input.metadata().collection_kind,
4468                            &metadata.op,
4469                            builders_or_callback, next_stmt_id,
4470                        );
4471
4472                        let stmt_id = next_stmt_id.get_and_increment();
4473                        let inspect_ident =
4474                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4475
4476                        match builders_or_callback {
4477                            BuildersOrCallback::Builders(graph_builders) => {
4478                                let builder = graph_builders.get_dfir_mut(&out_location);
4479                                builder.add_dfir(
4480                                    parse_quote! {
4481                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4482                                    },
4483                                    None,
4484                                    Some(&stmt_id.to_string()),
4485                                );
4486                            }
4487                            BuildersOrCallback::Callback(_, node_callback) => {
4488                                node_callback(node, next_stmt_id);
4489                            }
4490                        }
4491
4492                        ident_stack.push(inspect_ident);
4493                    }
4494
4495                    HydroNode::Unique { input, .. } => {
4496                        let input_ident = ident_stack.pop().unwrap();
4497
4498                        let stmt_id = next_stmt_id.get_and_increment();
4499                        let unique_ident =
4500                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4501
4502                        match builders_or_callback {
4503                            BuildersOrCallback::Builders(graph_builders) => {
4504                                let builder = graph_builders.get_dfir_mut(&out_location);
4505                                let lifetime = if input.metadata().location_id.is_top_level() {
4506                                    quote!('static)
4507                                } else {
4508                                    quote!('tick)
4509                                };
4510
4511                                builder.add_dfir(
4512                                    parse_quote! {
4513                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4514                                    },
4515                                    None,
4516                                    Some(&stmt_id.to_string()),
4517                                );
4518                            }
4519                            BuildersOrCallback::Callback(_, node_callback) => {
4520                                node_callback(node, next_stmt_id);
4521                            }
4522                        }
4523
4524                        ident_stack.push(unique_ident);
4525                    }
4526
4527                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4528                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4529                            if input.metadata().location_id.is_top_level()
4530                                && input.metadata().collection_kind.is_bounded()
4531                            {
4532                                parse_quote!(fold_no_replay)
4533                            } else {
4534                                parse_quote!(fold)
4535                            }
4536                        } else if matches!(node, HydroNode::Scan { .. }) {
4537                            parse_quote!(scan)
4538                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4539                            parse_quote!(scan_async_blocking)
4540                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4541                            if input.metadata().location_id.is_top_level()
4542                                && input.metadata().collection_kind.is_bounded()
4543                            {
4544                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4545                            } else {
4546                                parse_quote!(fold_keyed)
4547                            }
4548                        } else {
4549                            unreachable!()
4550                        };
4551
4552                        let (HydroNode::Fold { input, .. }
4553                        | HydroNode::FoldKeyed { input, .. }
4554                        | HydroNode::Scan { input, .. }
4555                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4556                        else {
4557                            unreachable!()
4558                        };
4559
4560                        let lifetime = if input.metadata().location_id.is_top_level() {
4561                            quote!('static)
4562                        } else {
4563                            quote!('tick)
4564                        };
4565
4566                        let input_ident = ident_stack.pop().unwrap();
4567
4568                        let (HydroNode::Fold { init, acc, .. }
4569                        | HydroNode::FoldKeyed { init, acc, .. }
4570                        | HydroNode::Scan { init, acc, .. }
4571                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4572                        else {
4573                            unreachable!()
4574                        };
4575
4576                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4577                        let init_tokens = init.emit_tokens(&mut ident_stack);
4578
4579                        let stmt_id = next_stmt_id.get_and_increment();
4580                        let fold_ident =
4581                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4582
4583                        match builders_or_callback {
4584                            BuildersOrCallback::Builders(graph_builders) => {
4585                                if matches!(node, HydroNode::Fold { .. })
4586                                    && node.metadata().location_id.is_top_level()
4587                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4588                                    && graph_builders.singleton_intermediates()
4589                                    && !node.metadata().collection_kind.is_bounded()
4590                                {
4591                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4592                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4593                                        &input.metadata().location_id,
4594                                        &input_ident,
4595                                        &input.metadata().collection_kind,
4596                                        &node.metadata().op,
4597                                    );
4598
4599                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4600                                        let acc: syn::Expr = parse_quote!({
4601                                            let mut __inner = #acc_tokens;
4602                                            move |__state, __batch: Vec<_>| {
4603                                                if __batch.is_empty() {
4604                                                    return None;
4605                                                }
4606                                                for __value in __batch {
4607                                                    __inner(__state, __value);
4608                                                }
4609                                                Some(__state.clone())
4610                                            }
4611                                        });
4612                                        (hooked, acc)
4613                                    } else {
4614                                        let acc: syn::Expr = parse_quote!({
4615                                            let mut __inner = #acc_tokens;
4616                                            move |__state, __value| {
4617                                                __inner(__state, __value);
4618                                                Some(__state.clone())
4619                                            }
4620                                        });
4621                                        (&input_ident, acc)
4622                                    };
4623
4624                                    let builder = graph_builders.get_dfir_mut(&out_location);
4625                                    builder.add_dfir(
4626                                        parse_quote! {
4627                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4628                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4629                                            #fold_ident = chain();
4630                                        },
4631                                        None,
4632                                        Some(&stmt_id.to_string()),
4633                                    );
4634
4635                                    if hooked_input_ident.is_some() {
4636                                        fold_hooked_idents.insert(fold_ident.to_string());
4637                                    }
4638                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4639                                    && node.metadata().location_id.is_top_level()
4640                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4641                                    && graph_builders.singleton_intermediates()
4642                                    && !node.metadata().collection_kind.is_bounded()
4643                                {
4644                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4645                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4646                                        &input.metadata().location_id,
4647                                        &input_ident,
4648                                        &input.metadata().collection_kind,
4649                                        &node.metadata().op,
4650                                    );
4651                                    let builder = graph_builders.get_dfir_mut(&out_location);
4652
4653                                    let wrapped_acc: syn::Expr = parse_quote!({
4654                                        let mut __init = #init_tokens;
4655                                        let mut __inner = #acc_tokens;
4656                                        move |__state, __kv: (_, _)| {
4657                                            // TODO(shadaj): we can avoid the clone when the entry exists
4658                                            let __state = __state
4659                                                .entry(::std::clone::Clone::clone(&__kv.0))
4660                                                .or_insert_with(|| (__init)());
4661                                            __inner(__state, __kv.1);
4662                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4663                                        }
4664                                    });
4665
4666                                    if let Some(hooked_input_ident) = hooked_input_ident {
4667                                        builder.add_dfir(
4668                                            parse_quote! {
4669                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4670                                            },
4671                                            None,
4672                                            Some(&stmt_id.to_string()),
4673                                        );
4674
4675                                        fold_hooked_idents.insert(fold_ident.to_string());
4676                                    } else {
4677                                        builder.add_dfir(
4678                                            parse_quote! {
4679                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4680                                            },
4681                                            None,
4682                                            Some(&stmt_id.to_string()),
4683                                        );
4684                                    }
4685                                } else if (matches!(node, HydroNode::Fold { .. })
4686                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4687                                    && !node.metadata().location_id.is_top_level()
4688                                    && graph_builders.singleton_intermediates()
4689                                {
4690                                    let input_ref = match &*node {
4691                                        HydroNode::Fold { input, .. } => input,
4692                                        HydroNode::FoldKeyed { input, .. } => input,
4693                                        _ => unreachable!(),
4694                                    };
4695                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4696                                        &input_ref.metadata().location_id,
4697                                        &input_ident,
4698                                        &input_ref.metadata().collection_kind,
4699                                        &node.metadata().op,
4700                                    );
4701
4702                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4703                                    let builder = graph_builders.get_dfir_mut(&out_location);
4704                                    builder.add_dfir(
4705                                        parse_quote! {
4706                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4707                                        },
4708                                        None,
4709                                        Some(&stmt_id.to_string()),
4710                                    );
4711                                } else {
4712                                    let builder = graph_builders.get_dfir_mut(&out_location);
4713                                    builder.add_dfir(
4714                                        parse_quote! {
4715                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4716                                        },
4717                                        None,
4718                                        Some(&stmt_id.to_string()),
4719                                    );
4720                                }
4721                            }
4722                            BuildersOrCallback::Callback(_, node_callback) => {
4723                                node_callback(node, next_stmt_id);
4724                            }
4725                        }
4726
4727                        ident_stack.push(fold_ident);
4728                    }
4729
4730                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4731                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4732                            if input.metadata().location_id.is_top_level()
4733                                && input.metadata().collection_kind.is_bounded()
4734                            {
4735                                parse_quote!(reduce_no_replay)
4736                            } else {
4737                                parse_quote!(reduce)
4738                            }
4739                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4740                            if input.metadata().location_id.is_top_level()
4741                                && input.metadata().collection_kind.is_bounded()
4742                            {
4743                                todo!(
4744                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4745                                )
4746                            } else {
4747                                parse_quote!(reduce_keyed)
4748                            }
4749                        } else {
4750                            unreachable!()
4751                        };
4752
4753                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4754                        else {
4755                            unreachable!()
4756                        };
4757
4758                        let lifetime = if input.metadata().location_id.is_top_level() {
4759                            quote!('static)
4760                        } else {
4761                            quote!('tick)
4762                        };
4763
4764                        let input_ident = ident_stack.pop().unwrap();
4765
4766                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4767                        else {
4768                            unreachable!()
4769                        };
4770
4771                        let f_tokens = f.emit_tokens(&mut ident_stack);
4772
4773                        let stmt_id = next_stmt_id.get_and_increment();
4774                        let reduce_ident =
4775                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4776
4777                        match builders_or_callback {
4778                            BuildersOrCallback::Builders(graph_builders) => {
4779                                if matches!(node, HydroNode::Reduce { .. })
4780                                    && node.metadata().location_id.is_top_level()
4781                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4782                                    && graph_builders.singleton_intermediates()
4783                                    && !node.metadata().collection_kind.is_bounded()
4784                                {
4785                                    todo!(
4786                                        "Reduce with optional intermediates is not yet supported in simulator"
4787                                    );
4788                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4789                                    && node.metadata().location_id.is_top_level()
4790                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4791                                    && graph_builders.singleton_intermediates()
4792                                    && !node.metadata().collection_kind.is_bounded()
4793                                {
4794                                    todo!(
4795                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4796                                    );
4797                                } else {
4798                                    let builder = graph_builders.get_dfir_mut(&out_location);
4799                                    builder.add_dfir(
4800                                        parse_quote! {
4801                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4802                                        },
4803                                        None,
4804                                        Some(&stmt_id.to_string()),
4805                                    );
4806                                }
4807                            }
4808                            BuildersOrCallback::Callback(_, node_callback) => {
4809                                node_callback(node, next_stmt_id);
4810                            }
4811                        }
4812
4813                        ident_stack.push(reduce_ident);
4814                    }
4815
4816                    HydroNode::ReduceKeyedWatermark {
4817                        f,
4818                        input,
4819                        metadata,
4820                        ..
4821                    } => {
4822                        let lifetime = if input.metadata().location_id.is_top_level() {
4823                            quote!('static)
4824                        } else {
4825                            quote!('tick)
4826                        };
4827
4828                        // watermark is processed second, so it's on top
4829                        let watermark_ident = ident_stack.pop().unwrap();
4830                        let input_ident = ident_stack.pop().unwrap();
4831                        let f_tokens = f.emit_tokens(&mut ident_stack);
4832
4833                        let stmt_id = next_stmt_id.get_and_increment();
4834                        let chain_ident = syn::Ident::new(
4835                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4836                            Span::call_site(),
4837                        );
4838
4839                        let fold_ident =
4840                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4841
4842                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4843                            && input.metadata().collection_kind.is_bounded()
4844                        {
4845                            parse_quote!(fold_no_replay)
4846                        } else {
4847                            parse_quote!(fold)
4848                        };
4849
4850                        match builders_or_callback {
4851                            BuildersOrCallback::Builders(graph_builders) => {
4852                                if metadata.location_id.is_top_level()
4853                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4854                                    && graph_builders.singleton_intermediates()
4855                                    && !metadata.collection_kind.is_bounded()
4856                                {
4857                                    todo!(
4858                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4859                                    )
4860                                } else {
4861                                    let builder = graph_builders.get_dfir_mut(&out_location);
4862                                    builder.add_dfir(
4863                                        parse_quote! {
4864                                            #chain_ident = chain();
4865                                            #input_ident
4866                                                -> map(|x| (Some(x), None))
4867                                                -> [0]#chain_ident;
4868                                            #watermark_ident
4869                                                -> map(|watermark| (None, Some(watermark)))
4870                                                -> [1]#chain_ident;
4871
4872                                            #fold_ident = #chain_ident
4873                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4874                                                    let __reduce_keyed_fn = #f_tokens;
4875                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4876                                                        if let Some((k, v)) = opt_payload {
4877                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4878                                                                if k < curr_watermark {
4879                                                                    return;
4880                                                                }
4881                                                            }
4882                                                            match map.entry(k) {
4883                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4884                                                                    e.insert(v);
4885                                                                }
4886                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4887                                                                    __reduce_keyed_fn(e.get_mut(), v);
4888                                                                }
4889                                                            }
4890                                                        } else {
4891                                                            let watermark = opt_watermark.unwrap();
4892                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4893                                                                if watermark <= curr_watermark {
4894                                                                    return;
4895                                                                }
4896                                                            }
4897                                                            map.retain(|k, _| *k >= watermark);
4898                                                            *opt_curr_watermark = Some(watermark);
4899                                                        }
4900                                                    }
4901                                                })
4902                                                -> flat_map(|(map, _curr_watermark)| map);
4903                                        },
4904                                        None,
4905                                        Some(&stmt_id.to_string()),
4906                                    );
4907                                }
4908                            }
4909                            BuildersOrCallback::Callback(_, node_callback) => {
4910                                node_callback(node, next_stmt_id);
4911                            }
4912                        }
4913
4914                        ident_stack.push(fold_ident);
4915                    }
4916
4917                    HydroNode::Network {
4918                        networking_info,
4919                        serialize_fn: serialize_pipeline,
4920                        instantiate_fn,
4921                        deserialize_fn: deserialize_pipeline,
4922                        input,
4923                        ..
4924                    } => {
4925                        let input_ident = ident_stack.pop().unwrap();
4926
4927                        let stmt_id = next_stmt_id.get_and_increment();
4928                        let receiver_stream_ident =
4929                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4930
4931                        match builders_or_callback {
4932                            BuildersOrCallback::Builders(graph_builders) => {
4933                                let (sink_expr, source_expr) = match instantiate_fn {
4934                                    DebugInstantiate::Building => (
4935                                        syn::parse_quote!(DUMMY_SINK),
4936                                        syn::parse_quote!(DUMMY_SOURCE),
4937                                    ),
4938
4939                                    DebugInstantiate::Finalized(finalized) => {
4940                                        (finalized.sink.clone(), finalized.source.clone())
4941                                    }
4942                                };
4943
4944                                graph_builders.create_network(
4945                                    &input.metadata().location_id,
4946                                    &out_location,
4947                                    input_ident,
4948                                    &receiver_stream_ident,
4949                                    serialize_pipeline.as_ref(),
4950                                    sink_expr,
4951                                    source_expr,
4952                                    deserialize_pipeline.as_ref(),
4953                                    stmt_id,
4954                                    networking_info,
4955                                );
4956                            }
4957                            BuildersOrCallback::Callback(_, node_callback) => {
4958                                node_callback(node, next_stmt_id);
4959                            }
4960                        }
4961
4962                        ident_stack.push(receiver_stream_ident);
4963                    }
4964
4965                    HydroNode::ExternalInput {
4966                        instantiate_fn,
4967                        deserialize_fn: deserialize_pipeline,
4968                        ..
4969                    } => {
4970                        let stmt_id = next_stmt_id.get_and_increment();
4971                        let receiver_stream_ident =
4972                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4973
4974                        match builders_or_callback {
4975                            BuildersOrCallback::Builders(graph_builders) => {
4976                                let (_, source_expr) = match instantiate_fn {
4977                                    DebugInstantiate::Building => (
4978                                        syn::parse_quote!(DUMMY_SINK),
4979                                        syn::parse_quote!(DUMMY_SOURCE),
4980                                    ),
4981
4982                                    DebugInstantiate::Finalized(finalized) => {
4983                                        (finalized.sink.clone(), finalized.source.clone())
4984                                    }
4985                                };
4986
4987                                graph_builders.create_external_source(
4988                                    &out_location,
4989                                    source_expr,
4990                                    &receiver_stream_ident,
4991                                    deserialize_pipeline.as_ref(),
4992                                    stmt_id,
4993                                );
4994                            }
4995                            BuildersOrCallback::Callback(_, node_callback) => {
4996                                node_callback(node, next_stmt_id);
4997                            }
4998                        }
4999
5000                        ident_stack.push(receiver_stream_ident);
5001                    }
5002
5003                    HydroNode::Counter {
5004                        tag,
5005                        duration,
5006                        prefix,
5007                        ..
5008                    } => {
5009                        let input_ident = ident_stack.pop().unwrap();
5010
5011                        let stmt_id = next_stmt_id.get_and_increment();
5012                        let counter_ident =
5013                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5014
5015                        match builders_or_callback {
5016                            BuildersOrCallback::Builders(graph_builders) => {
5017                                let arg = format!("{}({})", prefix, tag);
5018                                let builder = graph_builders.get_dfir_mut(&out_location);
5019                                builder.add_dfir(
5020                                    parse_quote! {
5021                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
5022                                    },
5023                                    None,
5024                                    Some(&stmt_id.to_string()),
5025                                );
5026                            }
5027                            BuildersOrCallback::Callback(_, node_callback) => {
5028                                node_callback(node, next_stmt_id);
5029                            }
5030                        }
5031
5032                        ident_stack.push(counter_ident);
5033                    }
5034                }
5035            },
5036            seen_tees,
5037            false,
5038        );
5039
5040        let ret = ident_stack
5041            .pop()
5042            .expect("ident_stack should have exactly one element after traversal");
5043        assert!(
5044            ident_stack.is_empty(),
5045            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5046             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5047            ident_stack.len()
5048        );
5049        ret
5050    }
5051
5052    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5053        match self {
5054            HydroNode::Placeholder => {
5055                panic!()
5056            }
5057            HydroNode::Cast { .. }
5058            | HydroNode::ObserveNonDet { .. }
5059            | HydroNode::UnboundSingleton { .. }
5060            | HydroNode::AssertIsConsistent { .. } => {}
5061            HydroNode::Source { source, .. } => match source {
5062                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5063                HydroSource::ExternalNetwork()
5064                | HydroSource::Spin()
5065                | HydroSource::ClusterMembers(_, _)
5066                | HydroSource::Embedded(_)
5067                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
5068            },
5069            HydroNode::SingletonSource { value, .. } => {
5070                transform(value);
5071            }
5072            HydroNode::CycleSource { .. }
5073            | HydroNode::Tee { .. }
5074            | HydroNode::Reference { .. }
5075            | HydroNode::YieldConcat { .. }
5076            | HydroNode::BeginAtomic { .. }
5077            | HydroNode::EndAtomic { .. }
5078            | HydroNode::Batch { .. }
5079            | HydroNode::Chain { .. }
5080            | HydroNode::MergeOrdered { .. }
5081            | HydroNode::ChainFirst { .. }
5082            | HydroNode::CrossProduct { .. }
5083            | HydroNode::CrossSingleton { .. }
5084            | HydroNode::ResolveFutures { .. }
5085            | HydroNode::ResolveFuturesBlocking { .. }
5086            | HydroNode::ResolveFuturesOrdered { .. }
5087            | HydroNode::Join { .. }
5088            | HydroNode::JoinHalf { .. }
5089            | HydroNode::Difference { .. }
5090            | HydroNode::AntiJoin { .. }
5091            | HydroNode::DeferTick { .. }
5092            | HydroNode::Enumerate { .. }
5093            | HydroNode::Unique { .. }
5094            | HydroNode::Sort { .. } => {}
5095            HydroNode::Map { f, .. }
5096            | HydroNode::FlatMap { f, .. }
5097            | HydroNode::FlatMapStreamBlocking { f, .. }
5098            | HydroNode::Filter { f, .. }
5099            | HydroNode::FilterMap { f, .. }
5100            | HydroNode::Inspect { f, .. }
5101            | HydroNode::Partition { f, .. }
5102            | HydroNode::Reduce { f, .. }
5103            | HydroNode::ReduceKeyed { f, .. }
5104            | HydroNode::ReduceKeyedWatermark { f, .. } => {
5105                transform(&mut f.expr);
5106            }
5107            HydroNode::Fold { init, acc, .. }
5108            | HydroNode::Scan { init, acc, .. }
5109            | HydroNode::ScanAsyncBlocking { init, acc, .. }
5110            | HydroNode::FoldKeyed { init, acc, .. } => {
5111                transform(&mut init.expr);
5112                transform(&mut acc.expr);
5113            }
5114            HydroNode::Network {
5115                serialize_fn,
5116                deserialize_fn,
5117                ..
5118            } => {
5119                if let Some(serialize_fn) = serialize_fn {
5120                    transform(serialize_fn);
5121                }
5122                if let Some(deserialize_fn) = deserialize_fn {
5123                    transform(deserialize_fn);
5124                }
5125            }
5126            HydroNode::ExternalInput { deserialize_fn, .. } => {
5127                if let Some(deserialize_fn) = deserialize_fn {
5128                    transform(deserialize_fn);
5129                }
5130            }
5131            HydroNode::Counter { duration, .. } => {
5132                transform(duration);
5133            }
5134        }
5135    }
5136
5137    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5138        &self.metadata().op
5139    }
5140
5141    pub fn metadata(&self) -> &HydroIrMetadata {
5142        match self {
5143            HydroNode::Placeholder => {
5144                panic!()
5145            }
5146            HydroNode::Cast { metadata, .. }
5147            | HydroNode::ObserveNonDet { metadata, .. }
5148            | HydroNode::AssertIsConsistent { metadata, .. }
5149            | HydroNode::UnboundSingleton { metadata, .. }
5150            | HydroNode::Source { metadata, .. }
5151            | HydroNode::SingletonSource { metadata, .. }
5152            | HydroNode::CycleSource { metadata, .. }
5153            | HydroNode::Tee { metadata, .. }
5154            | HydroNode::Reference { metadata, .. }
5155            | HydroNode::Partition { metadata, .. }
5156            | HydroNode::YieldConcat { metadata, .. }
5157            | HydroNode::BeginAtomic { metadata, .. }
5158            | HydroNode::EndAtomic { metadata, .. }
5159            | HydroNode::Batch { metadata, .. }
5160            | HydroNode::Chain { metadata, .. }
5161            | HydroNode::MergeOrdered { metadata, .. }
5162            | HydroNode::ChainFirst { metadata, .. }
5163            | HydroNode::CrossProduct { metadata, .. }
5164            | HydroNode::CrossSingleton { metadata, .. }
5165            | HydroNode::Join { metadata, .. }
5166            | HydroNode::JoinHalf { metadata, .. }
5167            | HydroNode::Difference { metadata, .. }
5168            | HydroNode::AntiJoin { metadata, .. }
5169            | HydroNode::ResolveFutures { metadata, .. }
5170            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5171            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5172            | HydroNode::Map { metadata, .. }
5173            | HydroNode::FlatMap { metadata, .. }
5174            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5175            | HydroNode::Filter { metadata, .. }
5176            | HydroNode::FilterMap { metadata, .. }
5177            | HydroNode::DeferTick { metadata, .. }
5178            | HydroNode::Enumerate { metadata, .. }
5179            | HydroNode::Inspect { metadata, .. }
5180            | HydroNode::Unique { metadata, .. }
5181            | HydroNode::Sort { metadata, .. }
5182            | HydroNode::Scan { metadata, .. }
5183            | HydroNode::ScanAsyncBlocking { metadata, .. }
5184            | HydroNode::Fold { metadata, .. }
5185            | HydroNode::FoldKeyed { metadata, .. }
5186            | HydroNode::Reduce { metadata, .. }
5187            | HydroNode::ReduceKeyed { metadata, .. }
5188            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5189            | HydroNode::ExternalInput { metadata, .. }
5190            | HydroNode::Network { metadata, .. }
5191            | HydroNode::Counter { metadata, .. } => metadata,
5192        }
5193    }
5194
5195    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5196        &mut self.metadata_mut().op
5197    }
5198
5199    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5200        match self {
5201            HydroNode::Placeholder => {
5202                panic!()
5203            }
5204            HydroNode::Cast { metadata, .. }
5205            | HydroNode::ObserveNonDet { metadata, .. }
5206            | HydroNode::AssertIsConsistent { metadata, .. }
5207            | HydroNode::UnboundSingleton { metadata, .. }
5208            | HydroNode::Source { metadata, .. }
5209            | HydroNode::SingletonSource { metadata, .. }
5210            | HydroNode::CycleSource { metadata, .. }
5211            | HydroNode::Tee { metadata, .. }
5212            | HydroNode::Reference { metadata, .. }
5213            | HydroNode::Partition { metadata, .. }
5214            | HydroNode::YieldConcat { metadata, .. }
5215            | HydroNode::BeginAtomic { metadata, .. }
5216            | HydroNode::EndAtomic { metadata, .. }
5217            | HydroNode::Batch { metadata, .. }
5218            | HydroNode::Chain { metadata, .. }
5219            | HydroNode::MergeOrdered { metadata, .. }
5220            | HydroNode::ChainFirst { metadata, .. }
5221            | HydroNode::CrossProduct { metadata, .. }
5222            | HydroNode::CrossSingleton { metadata, .. }
5223            | HydroNode::Join { metadata, .. }
5224            | HydroNode::JoinHalf { metadata, .. }
5225            | HydroNode::Difference { metadata, .. }
5226            | HydroNode::AntiJoin { metadata, .. }
5227            | HydroNode::ResolveFutures { metadata, .. }
5228            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5229            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5230            | HydroNode::Map { metadata, .. }
5231            | HydroNode::FlatMap { metadata, .. }
5232            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5233            | HydroNode::Filter { metadata, .. }
5234            | HydroNode::FilterMap { metadata, .. }
5235            | HydroNode::DeferTick { metadata, .. }
5236            | HydroNode::Enumerate { metadata, .. }
5237            | HydroNode::Inspect { metadata, .. }
5238            | HydroNode::Unique { metadata, .. }
5239            | HydroNode::Sort { metadata, .. }
5240            | HydroNode::Scan { metadata, .. }
5241            | HydroNode::ScanAsyncBlocking { metadata, .. }
5242            | HydroNode::Fold { metadata, .. }
5243            | HydroNode::FoldKeyed { metadata, .. }
5244            | HydroNode::Reduce { metadata, .. }
5245            | HydroNode::ReduceKeyed { metadata, .. }
5246            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5247            | HydroNode::ExternalInput { metadata, .. }
5248            | HydroNode::Network { metadata, .. }
5249            | HydroNode::Counter { metadata, .. } => metadata,
5250        }
5251    }
5252
5253    pub fn input(&self) -> Vec<&HydroNode> {
5254        match self {
5255            HydroNode::Placeholder => {
5256                panic!()
5257            }
5258            HydroNode::Source { .. }
5259            | HydroNode::SingletonSource { .. }
5260            | HydroNode::ExternalInput { .. }
5261            | HydroNode::CycleSource { .. }
5262            | HydroNode::Tee { .. }
5263            | HydroNode::Reference { .. }
5264            | HydroNode::Partition { .. } => {
5265                // Tee/Partition should find their input in separate special ways
5266                vec![]
5267            }
5268            HydroNode::Cast { inner, .. }
5269            | HydroNode::ObserveNonDet { inner, .. }
5270            | HydroNode::YieldConcat { inner, .. }
5271            | HydroNode::BeginAtomic { inner, .. }
5272            | HydroNode::EndAtomic { inner, .. }
5273            | HydroNode::Batch { inner, .. }
5274            | HydroNode::UnboundSingleton { inner, .. }
5275            | HydroNode::AssertIsConsistent { inner, .. } => {
5276                vec![inner]
5277            }
5278            HydroNode::Chain { first, second, .. } => {
5279                vec![first, second]
5280            }
5281            HydroNode::MergeOrdered { first, second, .. } => {
5282                vec![first, second]
5283            }
5284            HydroNode::ChainFirst { first, second, .. } => {
5285                vec![first, second]
5286            }
5287            HydroNode::CrossProduct { left, right, .. }
5288            | HydroNode::CrossSingleton { left, right, .. }
5289            | HydroNode::Join { left, right, .. }
5290            | HydroNode::JoinHalf { left, right, .. } => {
5291                vec![left, right]
5292            }
5293            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5294                vec![pos, neg]
5295            }
5296            HydroNode::Map { input, .. }
5297            | HydroNode::FlatMap { input, .. }
5298            | HydroNode::FlatMapStreamBlocking { input, .. }
5299            | HydroNode::Filter { input, .. }
5300            | HydroNode::FilterMap { input, .. }
5301            | HydroNode::Sort { input, .. }
5302            | HydroNode::DeferTick { input, .. }
5303            | HydroNode::Enumerate { input, .. }
5304            | HydroNode::Inspect { input, .. }
5305            | HydroNode::Unique { input, .. }
5306            | HydroNode::Network { input, .. }
5307            | HydroNode::Counter { input, .. }
5308            | HydroNode::ResolveFutures { input, .. }
5309            | HydroNode::ResolveFuturesBlocking { input, .. }
5310            | HydroNode::ResolveFuturesOrdered { input, .. }
5311            | HydroNode::Fold { input, .. }
5312            | HydroNode::FoldKeyed { input, .. }
5313            | HydroNode::Reduce { input, .. }
5314            | HydroNode::ReduceKeyed { input, .. }
5315            | HydroNode::Scan { input, .. }
5316            | HydroNode::ScanAsyncBlocking { input, .. } => {
5317                vec![input]
5318            }
5319            HydroNode::ReduceKeyedWatermark {
5320                input, watermark, ..
5321            } => {
5322                vec![input, watermark]
5323            }
5324        }
5325    }
5326
5327    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5328        self.input()
5329            .iter()
5330            .map(|input_node| input_node.metadata())
5331            .collect()
5332    }
5333
5334    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5335    /// has other live references, meaning the upstream is already driven
5336    /// by another consumer and does not need a Null sink.
5337    pub fn is_shared_with_others(&self) -> bool {
5338        match self {
5339            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5340                Rc::strong_count(&inner.0) > 1
5341            }
5342            // A zero-output reference node is valid in DFIR (it drains itself at
5343            // end of tick), so it doesn't need to be driven by another consumer.
5344            HydroNode::Reference { .. } => false,
5345            _ => false,
5346        }
5347    }
5348
5349    pub fn print_root(&self) -> String {
5350        match self {
5351            HydroNode::Placeholder => {
5352                panic!()
5353            }
5354            HydroNode::Cast { .. } => "Cast()".to_owned(),
5355            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5356            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5357            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5358            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5359            HydroNode::SingletonSource {
5360                value,
5361                first_tick_only,
5362                ..
5363            } => format!(
5364                "SingletonSource({:?}, first_tick_only={})",
5365                value, first_tick_only
5366            ),
5367            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5368            HydroNode::Tee { inner, .. } => {
5369                format!("Tee({})", inner.0.borrow().print_root())
5370            }
5371            HydroNode::Reference { inner, kind, .. } => {
5372                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5373            }
5374            HydroNode::Partition { f, is_true, .. } => {
5375                format!("Partition({:?}, is_true={})", f, is_true)
5376            }
5377            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5378            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5379            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5380            HydroNode::Batch { .. } => "Batch()".to_owned(),
5381            HydroNode::Chain { first, second, .. } => {
5382                format!("Chain({}, {})", first.print_root(), second.print_root())
5383            }
5384            HydroNode::MergeOrdered { first, second, .. } => {
5385                format!(
5386                    "MergeOrdered({}, {})",
5387                    first.print_root(),
5388                    second.print_root()
5389                )
5390            }
5391            HydroNode::ChainFirst { first, second, .. } => {
5392                format!(
5393                    "ChainFirst({}, {})",
5394                    first.print_root(),
5395                    second.print_root()
5396                )
5397            }
5398            HydroNode::CrossProduct { left, right, .. } => {
5399                format!(
5400                    "CrossProduct({}, {})",
5401                    left.print_root(),
5402                    right.print_root()
5403                )
5404            }
5405            HydroNode::CrossSingleton { left, right, .. } => {
5406                format!(
5407                    "CrossSingleton({}, {})",
5408                    left.print_root(),
5409                    right.print_root()
5410                )
5411            }
5412            HydroNode::Join { left, right, .. } => {
5413                format!("Join({}, {})", left.print_root(), right.print_root())
5414            }
5415            HydroNode::JoinHalf { left, right, .. } => {
5416                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5417            }
5418            HydroNode::Difference { pos, neg, .. } => {
5419                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5420            }
5421            HydroNode::AntiJoin { pos, neg, .. } => {
5422                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5423            }
5424            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5425            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5426            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5427            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5428            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5429            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5430            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5431            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5432            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5433            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5434            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5435            HydroNode::Unique { .. } => "Unique()".to_owned(),
5436            HydroNode::Sort { .. } => "Sort()".to_owned(),
5437            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5438            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5439            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5440                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5441            }
5442            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5443            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5444            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5445            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5446            HydroNode::Network { .. } => "Network()".to_owned(),
5447            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5448            HydroNode::Counter { tag, duration, .. } => {
5449                format!("Counter({:?}, {:?})", tag, duration)
5450            }
5451        }
5452    }
5453}
5454
5455#[cfg(feature = "build")]
5456fn instantiate_network<'a, D>(
5457    env: &mut D::InstantiateEnv,
5458    from_location: &LocationId,
5459    to_location: &LocationId,
5460    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5461    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5462    name: Option<&str>,
5463    networking_info: &crate::networking::NetworkingInfo,
5464) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5465where
5466    D: Deploy<'a>,
5467{
5468    let ((sink, source), connect_fn) = match (from_location, to_location) {
5469        (&LocationId::Process(from), &LocationId::Process(to)) => {
5470            let from_node = processes
5471                .get(from)
5472                .unwrap_or_else(|| {
5473                    panic!("A process used in the graph was not instantiated: {}", from)
5474                })
5475                .clone();
5476            let to_node = processes
5477                .get(to)
5478                .unwrap_or_else(|| {
5479                    panic!("A process used in the graph was not instantiated: {}", to)
5480                })
5481                .clone();
5482
5483            let sink_port = from_node.next_port();
5484            let source_port = to_node.next_port();
5485
5486            (
5487                D::o2o_sink_source(
5488                    env,
5489                    &from_node,
5490                    &sink_port,
5491                    &to_node,
5492                    &source_port,
5493                    name,
5494                    networking_info,
5495                ),
5496                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5497            )
5498        }
5499        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5500            let from_node = processes
5501                .get(from)
5502                .unwrap_or_else(|| {
5503                    panic!("A process used in the graph was not instantiated: {}", from)
5504                })
5505                .clone();
5506            let to_node = clusters
5507                .get(to)
5508                .unwrap_or_else(|| {
5509                    panic!("A cluster used in the graph was not instantiated: {}", to)
5510                })
5511                .clone();
5512
5513            let sink_port = from_node.next_port();
5514            let source_port = to_node.next_port();
5515
5516            (
5517                D::o2m_sink_source(
5518                    env,
5519                    &from_node,
5520                    &sink_port,
5521                    &to_node,
5522                    &source_port,
5523                    name,
5524                    networking_info,
5525                ),
5526                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5527            )
5528        }
5529        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5530            let from_node = clusters
5531                .get(from)
5532                .unwrap_or_else(|| {
5533                    panic!("A cluster used in the graph was not instantiated: {}", from)
5534                })
5535                .clone();
5536            let to_node = processes
5537                .get(to)
5538                .unwrap_or_else(|| {
5539                    panic!("A process used in the graph was not instantiated: {}", to)
5540                })
5541                .clone();
5542
5543            let sink_port = from_node.next_port();
5544            let source_port = to_node.next_port();
5545
5546            (
5547                D::m2o_sink_source(
5548                    env,
5549                    &from_node,
5550                    &sink_port,
5551                    &to_node,
5552                    &source_port,
5553                    name,
5554                    networking_info,
5555                ),
5556                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5557            )
5558        }
5559        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5560            let from_node = clusters
5561                .get(from)
5562                .unwrap_or_else(|| {
5563                    panic!("A cluster used in the graph was not instantiated: {}", from)
5564                })
5565                .clone();
5566            let to_node = clusters
5567                .get(to)
5568                .unwrap_or_else(|| {
5569                    panic!("A cluster used in the graph was not instantiated: {}", to)
5570                })
5571                .clone();
5572
5573            let sink_port = from_node.next_port();
5574            let source_port = to_node.next_port();
5575
5576            (
5577                D::m2m_sink_source(
5578                    env,
5579                    &from_node,
5580                    &sink_port,
5581                    &to_node,
5582                    &source_port,
5583                    name,
5584                    networking_info,
5585                ),
5586                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5587            )
5588        }
5589        (LocationId::Tick(_, _), _) => panic!(),
5590        (_, LocationId::Tick(_, _)) => panic!(),
5591        (LocationId::Atomic(_), _) => panic!(),
5592        (_, LocationId::Atomic(_)) => panic!(),
5593    };
5594    (sink, source, connect_fn)
5595}
5596
5597#[cfg(test)]
5598mod serde_test;
5599
5600#[cfg(test)]
5601mod test {
5602    use std::mem::size_of;
5603
5604    use stageleft::{QuotedWithContext, q};
5605
5606    use super::*;
5607
5608    #[test]
5609    #[cfg_attr(
5610        not(feature = "build"),
5611        ignore = "expects inclusion of feature-gated fields"
5612    )]
5613    fn hydro_node_size() {
5614        assert_eq!(size_of::<HydroNode>(), 264);
5615    }
5616
5617    #[test]
5618    #[cfg_attr(
5619        not(feature = "build"),
5620        ignore = "expects inclusion of feature-gated fields"
5621    )]
5622    fn hydro_root_size() {
5623        assert_eq!(size_of::<HydroRoot>(), 136);
5624    }
5625
5626    #[test]
5627    fn test_simplify_q_macro_basic() {
5628        // Test basic non-q! expression
5629        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5630        let result = simplify_q_macro(simple_expr.clone());
5631        assert_eq!(result, simple_expr);
5632    }
5633
5634    #[test]
5635    fn test_simplify_q_macro_actual_stageleft_call() {
5636        // Test a simplified version of what a real stageleft call might look like
5637        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5638        let result = simplify_q_macro(stageleft_call);
5639        // This should be processed by our visitor and simplified to q!(...)
5640        // since we detect the stageleft::runtime_support::fn_* pattern
5641        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5642    }
5643
5644    #[test]
5645    fn test_closure_no_pipe_at_start() {
5646        // Test a closure that does not start with a pipe
5647        let stageleft_call = q!({
5648            let foo = 123;
5649            move |b: usize| b + foo
5650        })
5651        .splice_fn1_ctx(&());
5652        let result = simplify_q_macro(stageleft_call);
5653        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5654    }
5655}