1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45 serializer.serialize_str(&self.to_string())
46 }
47}
48
49impl From<syn::Expr> for DebugExpr {
50 fn from(expr: syn::Expr) -> Self {
51 Self(Box::new(expr))
52 }
53}
54
55impl Deref for DebugExpr {
56 type Target = syn::Expr;
57
58 fn deref(&self) -> &Self::Target {
59 &self.0
60 }
61}
62
63impl ToTokens for DebugExpr {
64 fn to_tokens(&self, tokens: &mut TokenStream) {
65 self.0.to_tokens(tokens);
66 }
67}
68
69impl Debug for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{}", self.0.to_token_stream())
72 }
73}
74
75impl Display for DebugExpr {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let original = self.0.as_ref().clone();
78 let simplified = simplify_q_macro(original);
79
80 write!(f, "q!({})", quote::quote!(#simplified))
83 }
84}
85
86fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88 let mut simplifier = QMacroSimplifier::new();
91 simplifier.visit_expr_mut(&mut expr);
92
93 if let Some(simplified) = simplifier.simplified_result {
95 simplified
96 } else {
97 expr
98 }
99}
100
101#[derive(Default)]
103pub struct QMacroSimplifier {
104 pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108 pub fn new() -> Self {
109 Self::default()
110 }
111}
112
113impl VisitMut for QMacroSimplifier {
114 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115 if self.simplified_result.is_some() {
117 return;
118 }
119
120 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121 && self.is_stageleft_runtime_support_call(&path_expr.path)
123 && let Some(closure) = self.extract_closure_from_args(&call.args)
125 {
126 self.simplified_result = Some(closure);
127 return;
128 }
129
130 syn::visit_mut::visit_expr_mut(self, expr);
133 }
134}
135
136impl QMacroSimplifier {
137 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138 if let Some(last_segment) = path.segments.last() {
140 let fn_name = last_segment.ident.to_string();
141 fn_name.contains("_type_hint")
143 && path.segments.len() > 2
144 && path.segments[0].ident == "stageleft"
145 && path.segments[1].ident == "runtime_support"
146 } else {
147 false
148 }
149 }
150
151 fn extract_closure_from_args(
152 &self,
153 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154 ) -> Option<syn::Expr> {
155 for arg in args {
157 if let syn::Expr::Closure(_) = arg {
158 return Some(arg.clone());
159 }
160 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162 return Some(closure_expr);
163 }
164 }
165 None
166 }
167
168 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169 let mut visitor = ClosureFinder {
170 found_closure: None,
171 prefer_inner_blocks: true,
172 };
173 visitor.visit_expr(expr);
174 visitor.found_closure
175 }
176}
177
178struct ClosureFinder {
180 found_closure: Option<syn::Expr>,
181 prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186 if self.found_closure.is_some() {
188 return;
189 }
190
191 match expr {
192 syn::Expr::Closure(_) => {
193 self.found_closure = Some(expr.clone());
194 }
195 syn::Expr::Block(block) if self.prefer_inner_blocks => {
196 for stmt in &block.block.stmts {
198 if let syn::Stmt::Expr(stmt_expr, _) = stmt
199 && let syn::Expr::Block(_) = stmt_expr
200 {
201 let mut inner_visitor = ClosureFinder {
203 found_closure: None,
204 prefer_inner_blocks: false, };
206 inner_visitor.visit_expr(stmt_expr);
207 if inner_visitor.found_closure.is_some() {
208 self.found_closure = Some(stmt_expr.clone());
210 return;
211 }
212 }
213 }
214
215 visit::visit_expr(self, expr);
217
218 if self.found_closure.is_some() {
221 }
223 }
224 _ => {
225 visit::visit_expr(self, expr);
227 }
228 }
229 }
230}
231
232#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239 fn from(t: syn::Type) -> Self {
240 Self(Box::new(t))
241 }
242}
243
244impl Deref for DebugType {
245 type Target = syn::Type;
246
247 fn deref(&self) -> &Self::Target {
248 &self.0
249 }
250}
251
252impl ToTokens for DebugType {
253 fn to_tokens(&self, tokens: &mut TokenStream) {
254 self.0.to_tokens(tokens);
255 }
256}
257
258impl Debug for DebugType {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 write!(f, "{}", self.0.to_token_stream())
261 }
262}
263
264impl serde::Serialize for DebugType {
265 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267 }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271 backtrace: &Backtrace,
272 serializer: S,
273) -> Result<S::Ok, S::Error> {
274 match backtrace.format_span() {
275 Some(span) => serializer.serialize_some(&span),
276 None => serializer.serialize_none(),
277 }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281 ident: &syn::Ident,
282 serializer: S,
283) -> Result<S::Ok, S::Error> {
284 serializer.serialize_str(&ident.to_string())
285}
286
287pub enum DebugInstantiate {
288 Building,
289 Finalized(Box<DebugInstantiateFinalized>),
290}
291
292impl serde::Serialize for DebugInstantiate {
293 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294 match self {
295 DebugInstantiate::Building => {
296 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297 }
298 DebugInstantiate::Finalized(_) => {
299 panic!(
300 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301 )
302 }
303 }
304 }
305}
306
307#[cfg_attr(
308 not(feature = "build"),
309 expect(
310 dead_code,
311 reason = "sink, source unused without `feature = \"build\"`."
312 )
313)]
314pub struct DebugInstantiateFinalized {
315 sink: syn::Expr,
316 source: syn::Expr,
317 connect_fn: Option<Box<dyn FnOnce()>>,
318}
319
320impl From<DebugInstantiateFinalized> for DebugInstantiate {
321 fn from(f: DebugInstantiateFinalized) -> Self {
322 Self::Finalized(Box::new(f))
323 }
324}
325
326impl Debug for DebugInstantiate {
327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328 write!(f, "<network instantiate>")
329 }
330}
331
332impl Hash for DebugInstantiate {
333 fn hash<H: Hasher>(&self, _state: &mut H) {
334 }
336}
337
338impl Clone for DebugInstantiate {
339 fn clone(&self) -> Self {
340 match self {
341 DebugInstantiate::Building => DebugInstantiate::Building,
342 DebugInstantiate::Finalized(_) => {
343 panic!("DebugInstantiate::Finalized should not be cloned")
344 }
345 }
346 }
347}
348
349#[derive(Debug, Hash, Clone, serde::Serialize)]
358pub enum ClusterMembersState {
359 Uninit,
361 Stream(DebugExpr),
364 Tee(LocationId, LocationId),
368}
369
370#[derive(Debug, Hash, Clone, serde::Serialize)]
372pub enum HydroSource {
373 Stream(DebugExpr),
374 ExternalNetwork(),
375 Iter(DebugExpr),
376 Spin(),
377 ClusterMembers(LocationId, ClusterMembersState),
378 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
380}
381
382#[cfg(feature = "build")]
383pub trait DfirBuilder {
389 fn singleton_intermediates(&self) -> bool;
391
392 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
394
395 fn batch(
396 &mut self,
397 in_ident: syn::Ident,
398 in_location: &LocationId,
399 in_kind: &CollectionKind,
400 out_ident: &syn::Ident,
401 out_location: &LocationId,
402 op_meta: &HydroIrOpMetadata,
403 );
404 fn yield_from_tick(
405 &mut self,
406 in_ident: syn::Ident,
407 in_location: &LocationId,
408 in_kind: &CollectionKind,
409 out_ident: &syn::Ident,
410 out_location: &LocationId,
411 );
412
413 fn begin_atomic(
414 &mut self,
415 in_ident: syn::Ident,
416 in_location: &LocationId,
417 in_kind: &CollectionKind,
418 out_ident: &syn::Ident,
419 out_location: &LocationId,
420 op_meta: &HydroIrOpMetadata,
421 );
422 fn end_atomic(
423 &mut self,
424 in_ident: syn::Ident,
425 in_location: &LocationId,
426 in_kind: &CollectionKind,
427 out_ident: &syn::Ident,
428 );
429
430 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
431 fn observe_nondet(
432 &mut self,
433 trusted: bool,
434 location: &LocationId,
435 in_ident: syn::Ident,
436 in_kind: &CollectionKind,
437 out_ident: &syn::Ident,
438 out_kind: &CollectionKind,
439 op_meta: &HydroIrOpMetadata,
440 );
441
442 #[expect(clippy::too_many_arguments, reason = "TODO")]
443 fn create_network(
444 &mut self,
445 from: &LocationId,
446 to: &LocationId,
447 input_ident: syn::Ident,
448 out_ident: &syn::Ident,
449 serialize: Option<&DebugExpr>,
450 sink: syn::Expr,
451 source: syn::Expr,
452 deserialize: Option<&DebugExpr>,
453 tag_id: usize,
454 networking_info: &crate::networking::NetworkingInfo,
455 );
456
457 fn create_external_source(
458 &mut self,
459 on: &LocationId,
460 source_expr: syn::Expr,
461 out_ident: &syn::Ident,
462 deserialize: Option<&DebugExpr>,
463 tag_id: usize,
464 );
465
466 fn create_external_output(
467 &mut self,
468 on: &LocationId,
469 sink_expr: syn::Expr,
470 input_ident: &syn::Ident,
471 serialize: Option<&DebugExpr>,
472 tag_id: usize,
473 );
474}
475
476#[cfg(feature = "build")]
477impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
478 fn singleton_intermediates(&self) -> bool {
479 false
480 }
481
482 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
483 self.entry(location.root().key())
484 .expect("location was removed")
485 .or_default()
486 }
487
488 fn batch(
489 &mut self,
490 in_ident: syn::Ident,
491 in_location: &LocationId,
492 in_kind: &CollectionKind,
493 out_ident: &syn::Ident,
494 _out_location: &LocationId,
495 _op_meta: &HydroIrOpMetadata,
496 ) {
497 let builder = self.get_dfir_mut(in_location.root());
498 if in_kind.is_bounded()
499 && matches!(
500 in_kind,
501 CollectionKind::Singleton { .. }
502 | CollectionKind::Optional { .. }
503 | CollectionKind::KeyedSingleton { .. }
504 )
505 {
506 assert!(in_location.is_top_level());
507 builder.add_dfir(
508 parse_quote! {
509 #out_ident = #in_ident -> persist::<'static>();
510 },
511 None,
512 None,
513 );
514 } else {
515 builder.add_dfir(
516 parse_quote! {
517 #out_ident = #in_ident;
518 },
519 None,
520 None,
521 );
522 }
523 }
524
525 fn yield_from_tick(
526 &mut self,
527 in_ident: syn::Ident,
528 in_location: &LocationId,
529 _in_kind: &CollectionKind,
530 out_ident: &syn::Ident,
531 _out_location: &LocationId,
532 ) {
533 let builder = self.get_dfir_mut(in_location.root());
534 builder.add_dfir(
535 parse_quote! {
536 #out_ident = #in_ident;
537 },
538 None,
539 None,
540 );
541 }
542
543 fn begin_atomic(
544 &mut self,
545 in_ident: syn::Ident,
546 in_location: &LocationId,
547 _in_kind: &CollectionKind,
548 out_ident: &syn::Ident,
549 _out_location: &LocationId,
550 _op_meta: &HydroIrOpMetadata,
551 ) {
552 let builder = self.get_dfir_mut(in_location.root());
553 builder.add_dfir(
554 parse_quote! {
555 #out_ident = #in_ident;
556 },
557 None,
558 None,
559 );
560 }
561
562 fn end_atomic(
563 &mut self,
564 in_ident: syn::Ident,
565 in_location: &LocationId,
566 _in_kind: &CollectionKind,
567 out_ident: &syn::Ident,
568 ) {
569 let builder = self.get_dfir_mut(in_location.root());
570 builder.add_dfir(
571 parse_quote! {
572 #out_ident = #in_ident;
573 },
574 None,
575 None,
576 );
577 }
578
579 fn observe_nondet(
580 &mut self,
581 _trusted: bool,
582 location: &LocationId,
583 in_ident: syn::Ident,
584 _in_kind: &CollectionKind,
585 out_ident: &syn::Ident,
586 _out_kind: &CollectionKind,
587 _op_meta: &HydroIrOpMetadata,
588 ) {
589 let builder = self.get_dfir_mut(location);
590 builder.add_dfir(
591 parse_quote! {
592 #out_ident = #in_ident;
593 },
594 None,
595 None,
596 );
597 }
598
599 fn create_network(
600 &mut self,
601 from: &LocationId,
602 to: &LocationId,
603 input_ident: syn::Ident,
604 out_ident: &syn::Ident,
605 serialize: Option<&DebugExpr>,
606 sink: syn::Expr,
607 source: syn::Expr,
608 deserialize: Option<&DebugExpr>,
609 tag_id: usize,
610 _networking_info: &crate::networking::NetworkingInfo,
611 ) {
612 let sender_builder = self.get_dfir_mut(from);
613 if let Some(serialize_pipeline) = serialize {
614 sender_builder.add_dfir(
615 parse_quote! {
616 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
617 },
618 None,
619 Some(&format!("send{}", tag_id)),
621 );
622 } else {
623 sender_builder.add_dfir(
624 parse_quote! {
625 #input_ident -> dest_sink(#sink);
626 },
627 None,
628 Some(&format!("send{}", tag_id)),
629 );
630 }
631
632 let receiver_builder = self.get_dfir_mut(to);
633 if let Some(deserialize_pipeline) = deserialize {
634 receiver_builder.add_dfir(
635 parse_quote! {
636 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
637 },
638 None,
639 Some(&format!("recv{}", tag_id)),
640 );
641 } else {
642 receiver_builder.add_dfir(
643 parse_quote! {
644 #out_ident = source_stream(#source);
645 },
646 None,
647 Some(&format!("recv{}", tag_id)),
648 );
649 }
650 }
651
652 fn create_external_source(
653 &mut self,
654 on: &LocationId,
655 source_expr: syn::Expr,
656 out_ident: &syn::Ident,
657 deserialize: Option<&DebugExpr>,
658 tag_id: usize,
659 ) {
660 let receiver_builder = self.get_dfir_mut(on);
661 if let Some(deserialize_pipeline) = deserialize {
662 receiver_builder.add_dfir(
663 parse_quote! {
664 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
665 },
666 None,
667 Some(&format!("recv{}", tag_id)),
668 );
669 } else {
670 receiver_builder.add_dfir(
671 parse_quote! {
672 #out_ident = source_stream(#source_expr);
673 },
674 None,
675 Some(&format!("recv{}", tag_id)),
676 );
677 }
678 }
679
680 fn create_external_output(
681 &mut self,
682 on: &LocationId,
683 sink_expr: syn::Expr,
684 input_ident: &syn::Ident,
685 serialize: Option<&DebugExpr>,
686 tag_id: usize,
687 ) {
688 let sender_builder = self.get_dfir_mut(on);
689 if let Some(serialize_fn) = serialize {
690 sender_builder.add_dfir(
691 parse_quote! {
692 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
693 },
694 None,
695 Some(&format!("send{}", tag_id)),
697 );
698 } else {
699 sender_builder.add_dfir(
700 parse_quote! {
701 #input_ident -> dest_sink(#sink_expr);
702 },
703 None,
704 Some(&format!("send{}", tag_id)),
705 );
706 }
707 }
708}
709
710#[cfg(feature = "build")]
711pub enum BuildersOrCallback<'a, L, N>
712where
713 L: FnMut(&mut HydroRoot, &mut usize),
714 N: FnMut(&mut HydroNode, &mut usize),
715{
716 Builders(&'a mut dyn DfirBuilder),
717 Callback(L, N),
718}
719
720#[derive(Debug, Hash, serde::Serialize)]
724pub enum HydroRoot {
725 ForEach {
726 f: DebugExpr,
727 input: Box<HydroNode>,
728 op_metadata: HydroIrOpMetadata,
729 },
730 SendExternal {
731 to_external_key: LocationKey,
732 to_port_id: ExternalPortId,
733 to_many: bool,
734 unpaired: bool,
735 serialize_fn: Option<DebugExpr>,
736 instantiate_fn: DebugInstantiate,
737 input: Box<HydroNode>,
738 op_metadata: HydroIrOpMetadata,
739 },
740 DestSink {
741 sink: DebugExpr,
742 input: Box<HydroNode>,
743 op_metadata: HydroIrOpMetadata,
744 },
745 CycleSink {
746 cycle_id: CycleId,
747 input: Box<HydroNode>,
748 op_metadata: HydroIrOpMetadata,
749 },
750 EmbeddedOutput {
751 #[serde(serialize_with = "serialize_ident")]
752 ident: syn::Ident,
753 input: Box<HydroNode>,
754 op_metadata: HydroIrOpMetadata,
755 },
756 Null {
757 input: Box<HydroNode>,
758 op_metadata: HydroIrOpMetadata,
759 },
760}
761
762impl HydroRoot {
763 #[cfg(feature = "build")]
764 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
765 pub fn compile_network<'a, D>(
766 &mut self,
767 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
768 seen_tees: &mut SeenSharedNodes,
769 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
770 processes: &SparseSecondaryMap<LocationKey, D::Process>,
771 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
772 externals: &SparseSecondaryMap<LocationKey, D::External>,
773 env: &mut D::InstantiateEnv,
774 ) where
775 D: Deploy<'a>,
776 {
777 let refcell_extra_stmts = RefCell::new(extra_stmts);
778 let refcell_env = RefCell::new(env);
779 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
780 self.transform_bottom_up(
781 &mut |l| {
782 if let HydroRoot::SendExternal {
783 input,
784 to_external_key,
785 to_port_id,
786 to_many,
787 unpaired,
788 instantiate_fn,
789 ..
790 } = l
791 {
792 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
793 DebugInstantiate::Building => {
794 let to_node = externals
795 .get(*to_external_key)
796 .unwrap_or_else(|| {
797 panic!("A external used in the graph was not instantiated: {}", to_external_key)
798 })
799 .clone();
800
801 match input.metadata().location_id.root() {
802 &LocationId::Process(process_key) => {
803 if *to_many {
804 (
805 (
806 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
807 parse_quote!(DUMMY),
808 ),
809 Box::new(|| {}) as Box<dyn FnOnce()>,
810 )
811 } else {
812 let from_node = processes
813 .get(process_key)
814 .unwrap_or_else(|| {
815 panic!("A process used in the graph was not instantiated: {}", process_key)
816 })
817 .clone();
818
819 let sink_port = from_node.next_port();
820 let source_port = to_node.next_port();
821
822 if *unpaired {
823 use stageleft::quote_type;
824 use tokio_util::codec::LengthDelimitedCodec;
825
826 to_node.register(*to_port_id, source_port.clone());
827
828 let _ = D::e2o_source(
829 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
830 &to_node, &source_port,
831 &from_node, &sink_port,
832 "e_type::<LengthDelimitedCodec>(),
833 format!("{}_{}", *to_external_key, *to_port_id)
834 );
835 }
836
837 (
838 (
839 D::o2e_sink(
840 &from_node,
841 &sink_port,
842 &to_node,
843 &source_port,
844 format!("{}_{}", *to_external_key, *to_port_id)
845 ),
846 parse_quote!(DUMMY),
847 ),
848 if *unpaired {
849 D::e2o_connect(
850 &to_node,
851 &source_port,
852 &from_node,
853 &sink_port,
854 *to_many,
855 NetworkHint::Auto,
856 )
857 } else {
858 Box::new(|| {}) as Box<dyn FnOnce()>
859 },
860 )
861 }
862 }
863 LocationId::Cluster(cluster_key) => {
864 let from_node = clusters
865 .get(*cluster_key)
866 .unwrap_or_else(|| {
867 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
868 })
869 .clone();
870
871 let sink_port = from_node.next_port();
872 let source_port = to_node.next_port();
873
874 if *unpaired {
875 to_node.register(*to_port_id, source_port.clone());
876 }
877
878 (
879 (
880 D::m2e_sink(
881 &from_node,
882 &sink_port,
883 &to_node,
884 &source_port,
885 format!("{}_{}", *to_external_key, *to_port_id)
886 ),
887 parse_quote!(DUMMY),
888 ),
889 Box::new(|| {}) as Box<dyn FnOnce()>,
890 )
891 }
892 _ => panic!()
893 }
894 },
895
896 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
897 };
898
899 *instantiate_fn = DebugInstantiateFinalized {
900 sink: sink_expr,
901 source: source_expr,
902 connect_fn: Some(connect_fn),
903 }
904 .into();
905 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
906 let element_type = match &input.metadata().collection_kind {
907 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
908 _ => panic!("Embedded output must have Stream collection kind"),
909 };
910 let location_key = match input.metadata().location_id.root() {
911 LocationId::Process(key) | LocationId::Cluster(key) => *key,
912 _ => panic!("Embedded output must be on a process or cluster"),
913 };
914 D::register_embedded_output(
915 &mut refcell_env.borrow_mut(),
916 location_key,
917 ident,
918 &element_type,
919 );
920 }
921 },
922 &mut |n| {
923 if let HydroNode::Network {
924 name,
925 networking_info,
926 input,
927 instantiate_fn,
928 metadata,
929 ..
930 } = n
931 {
932 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
933 DebugInstantiate::Building => instantiate_network::<D>(
934 &mut refcell_env.borrow_mut(),
935 input.metadata().location_id.root(),
936 metadata.location_id.root(),
937 processes,
938 clusters,
939 name.as_deref(),
940 networking_info,
941 ),
942
943 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
944 };
945
946 *instantiate_fn = DebugInstantiateFinalized {
947 sink: sink_expr,
948 source: source_expr,
949 connect_fn: Some(connect_fn),
950 }
951 .into();
952 } else if let HydroNode::ExternalInput {
953 from_external_key,
954 from_port_id,
955 from_many,
956 codec_type,
957 port_hint,
958 instantiate_fn,
959 metadata,
960 ..
961 } = n
962 {
963 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
964 DebugInstantiate::Building => {
965 let from_node = externals
966 .get(*from_external_key)
967 .unwrap_or_else(|| {
968 panic!(
969 "A external used in the graph was not instantiated: {}",
970 from_external_key,
971 )
972 })
973 .clone();
974
975 match metadata.location_id.root() {
976 &LocationId::Process(process_key) => {
977 let to_node = processes
978 .get(process_key)
979 .unwrap_or_else(|| {
980 panic!("A process used in the graph was not instantiated: {}", process_key)
981 })
982 .clone();
983
984 let sink_port = from_node.next_port();
985 let source_port = to_node.next_port();
986
987 from_node.register(*from_port_id, sink_port.clone());
988
989 (
990 (
991 parse_quote!(DUMMY),
992 if *from_many {
993 D::e2o_many_source(
994 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
995 &to_node, &source_port,
996 codec_type.0.as_ref(),
997 format!("{}_{}", *from_external_key, *from_port_id)
998 )
999 } else {
1000 D::e2o_source(
1001 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1002 &from_node, &sink_port,
1003 &to_node, &source_port,
1004 codec_type.0.as_ref(),
1005 format!("{}_{}", *from_external_key, *from_port_id)
1006 )
1007 },
1008 ),
1009 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1010 )
1011 }
1012 LocationId::Cluster(cluster_key) => {
1013 let to_node = clusters
1014 .get(*cluster_key)
1015 .unwrap_or_else(|| {
1016 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1017 })
1018 .clone();
1019
1020 let sink_port = from_node.next_port();
1021 let source_port = to_node.next_port();
1022
1023 from_node.register(*from_port_id, sink_port.clone());
1024
1025 (
1026 (
1027 parse_quote!(DUMMY),
1028 D::e2m_source(
1029 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1030 &from_node, &sink_port,
1031 &to_node, &source_port,
1032 codec_type.0.as_ref(),
1033 format!("{}_{}", *from_external_key, *from_port_id)
1034 ),
1035 ),
1036 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1037 )
1038 }
1039 _ => panic!()
1040 }
1041 },
1042
1043 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1044 };
1045
1046 *instantiate_fn = DebugInstantiateFinalized {
1047 sink: sink_expr,
1048 source: source_expr,
1049 connect_fn: Some(connect_fn),
1050 }
1051 .into();
1052 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1053 let element_type = match &metadata.collection_kind {
1054 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1055 _ => panic!("Embedded source must have Stream collection kind"),
1056 };
1057 let location_key = match metadata.location_id.root() {
1058 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1059 _ => panic!("Embedded source must be on a process or cluster"),
1060 };
1061 D::register_embedded_stream_input(
1062 &mut refcell_env.borrow_mut(),
1063 location_key,
1064 ident,
1065 &element_type,
1066 );
1067 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1068 let element_type = match &metadata.collection_kind {
1069 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1070 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1071 };
1072 let location_key = match metadata.location_id.root() {
1073 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1074 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1075 };
1076 D::register_embedded_singleton_input(
1077 &mut refcell_env.borrow_mut(),
1078 location_key,
1079 ident,
1080 &element_type,
1081 );
1082 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1083 match state {
1084 ClusterMembersState::Uninit => {
1085 let at_location = metadata.location_id.root().clone();
1086 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1087 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1088 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1090 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1091 &(),
1092 );
1093 *state = ClusterMembersState::Stream(expr.into());
1094 } else {
1095 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1097 }
1098 }
1099 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1100 panic!("cluster members already finalized");
1101 }
1102 }
1103 }
1104 },
1105 seen_tees,
1106 false,
1107 );
1108 }
1109
1110 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1111 self.transform_bottom_up(
1112 &mut |l| {
1113 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1114 match instantiate_fn {
1115 DebugInstantiate::Building => panic!("network not built"),
1116
1117 DebugInstantiate::Finalized(finalized) => {
1118 (finalized.connect_fn.take().unwrap())();
1119 }
1120 }
1121 }
1122 },
1123 &mut |n| {
1124 if let HydroNode::Network { instantiate_fn, .. }
1125 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1126 {
1127 match instantiate_fn {
1128 DebugInstantiate::Building => panic!("network not built"),
1129
1130 DebugInstantiate::Finalized(finalized) => {
1131 (finalized.connect_fn.take().unwrap())();
1132 }
1133 }
1134 }
1135 },
1136 seen_tees,
1137 false,
1138 );
1139 }
1140
1141 pub fn transform_bottom_up(
1142 &mut self,
1143 transform_root: &mut impl FnMut(&mut HydroRoot),
1144 transform_node: &mut impl FnMut(&mut HydroNode),
1145 seen_tees: &mut SeenSharedNodes,
1146 check_well_formed: bool,
1147 ) {
1148 self.transform_children(
1149 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1150 seen_tees,
1151 );
1152
1153 transform_root(self);
1154 }
1155
1156 pub fn transform_children(
1157 &mut self,
1158 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1159 seen_tees: &mut SeenSharedNodes,
1160 ) {
1161 match self {
1162 HydroRoot::ForEach { input, .. }
1163 | HydroRoot::SendExternal { input, .. }
1164 | HydroRoot::DestSink { input, .. }
1165 | HydroRoot::CycleSink { input, .. }
1166 | HydroRoot::EmbeddedOutput { input, .. }
1167 | HydroRoot::Null { input, .. } => {
1168 transform(input, seen_tees);
1169 }
1170 }
1171 }
1172
1173 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1174 match self {
1175 HydroRoot::ForEach {
1176 f,
1177 input,
1178 op_metadata,
1179 } => HydroRoot::ForEach {
1180 f: f.clone(),
1181 input: Box::new(input.deep_clone(seen_tees)),
1182 op_metadata: op_metadata.clone(),
1183 },
1184 HydroRoot::SendExternal {
1185 to_external_key,
1186 to_port_id,
1187 to_many,
1188 unpaired,
1189 serialize_fn,
1190 instantiate_fn,
1191 input,
1192 op_metadata,
1193 } => HydroRoot::SendExternal {
1194 to_external_key: *to_external_key,
1195 to_port_id: *to_port_id,
1196 to_many: *to_many,
1197 unpaired: *unpaired,
1198 serialize_fn: serialize_fn.clone(),
1199 instantiate_fn: instantiate_fn.clone(),
1200 input: Box::new(input.deep_clone(seen_tees)),
1201 op_metadata: op_metadata.clone(),
1202 },
1203 HydroRoot::DestSink {
1204 sink,
1205 input,
1206 op_metadata,
1207 } => HydroRoot::DestSink {
1208 sink: sink.clone(),
1209 input: Box::new(input.deep_clone(seen_tees)),
1210 op_metadata: op_metadata.clone(),
1211 },
1212 HydroRoot::CycleSink {
1213 cycle_id,
1214 input,
1215 op_metadata,
1216 } => HydroRoot::CycleSink {
1217 cycle_id: *cycle_id,
1218 input: Box::new(input.deep_clone(seen_tees)),
1219 op_metadata: op_metadata.clone(),
1220 },
1221 HydroRoot::EmbeddedOutput {
1222 ident,
1223 input,
1224 op_metadata,
1225 } => HydroRoot::EmbeddedOutput {
1226 ident: ident.clone(),
1227 input: Box::new(input.deep_clone(seen_tees)),
1228 op_metadata: op_metadata.clone(),
1229 },
1230 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1231 input: Box::new(input.deep_clone(seen_tees)),
1232 op_metadata: op_metadata.clone(),
1233 },
1234 }
1235 }
1236
1237 #[cfg(feature = "build")]
1238 pub fn emit(
1239 &mut self,
1240 graph_builders: &mut dyn DfirBuilder,
1241 seen_tees: &mut SeenSharedNodes,
1242 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1243 next_stmt_id: &mut usize,
1244 ) {
1245 self.emit_core(
1246 &mut BuildersOrCallback::<
1247 fn(&mut HydroRoot, &mut usize),
1248 fn(&mut HydroNode, &mut usize),
1249 >::Builders(graph_builders),
1250 seen_tees,
1251 built_tees,
1252 next_stmt_id,
1253 );
1254 }
1255
1256 #[cfg(feature = "build")]
1257 pub fn emit_core(
1258 &mut self,
1259 builders_or_callback: &mut BuildersOrCallback<
1260 impl FnMut(&mut HydroRoot, &mut usize),
1261 impl FnMut(&mut HydroNode, &mut usize),
1262 >,
1263 seen_tees: &mut SeenSharedNodes,
1264 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1265 next_stmt_id: &mut usize,
1266 ) {
1267 match self {
1268 HydroRoot::ForEach { f, input, .. } => {
1269 let input_ident =
1270 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1271
1272 match builders_or_callback {
1273 BuildersOrCallback::Builders(graph_builders) => {
1274 graph_builders
1275 .get_dfir_mut(&input.metadata().location_id)
1276 .add_dfir(
1277 parse_quote! {
1278 #input_ident -> for_each(#f);
1279 },
1280 None,
1281 Some(&next_stmt_id.to_string()),
1282 );
1283 }
1284 BuildersOrCallback::Callback(leaf_callback, _) => {
1285 leaf_callback(self, next_stmt_id);
1286 }
1287 }
1288
1289 *next_stmt_id += 1;
1290 }
1291
1292 HydroRoot::SendExternal {
1293 serialize_fn,
1294 instantiate_fn,
1295 input,
1296 ..
1297 } => {
1298 let input_ident =
1299 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1300
1301 match builders_or_callback {
1302 BuildersOrCallback::Builders(graph_builders) => {
1303 let (sink_expr, _) = match instantiate_fn {
1304 DebugInstantiate::Building => (
1305 syn::parse_quote!(DUMMY_SINK),
1306 syn::parse_quote!(DUMMY_SOURCE),
1307 ),
1308
1309 DebugInstantiate::Finalized(finalized) => {
1310 (finalized.sink.clone(), finalized.source.clone())
1311 }
1312 };
1313
1314 graph_builders.create_external_output(
1315 &input.metadata().location_id,
1316 sink_expr,
1317 &input_ident,
1318 serialize_fn.as_ref(),
1319 *next_stmt_id,
1320 );
1321 }
1322 BuildersOrCallback::Callback(leaf_callback, _) => {
1323 leaf_callback(self, next_stmt_id);
1324 }
1325 }
1326
1327 *next_stmt_id += 1;
1328 }
1329
1330 HydroRoot::DestSink { sink, input, .. } => {
1331 let input_ident =
1332 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1333
1334 match builders_or_callback {
1335 BuildersOrCallback::Builders(graph_builders) => {
1336 graph_builders
1337 .get_dfir_mut(&input.metadata().location_id)
1338 .add_dfir(
1339 parse_quote! {
1340 #input_ident -> dest_sink(#sink);
1341 },
1342 None,
1343 Some(&next_stmt_id.to_string()),
1344 );
1345 }
1346 BuildersOrCallback::Callback(leaf_callback, _) => {
1347 leaf_callback(self, next_stmt_id);
1348 }
1349 }
1350
1351 *next_stmt_id += 1;
1352 }
1353
1354 HydroRoot::CycleSink {
1355 cycle_id, input, ..
1356 } => {
1357 let input_ident =
1358 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1359
1360 match builders_or_callback {
1361 BuildersOrCallback::Builders(graph_builders) => {
1362 let elem_type: syn::Type = match &input.metadata().collection_kind {
1363 CollectionKind::KeyedSingleton {
1364 key_type,
1365 value_type,
1366 ..
1367 }
1368 | CollectionKind::KeyedStream {
1369 key_type,
1370 value_type,
1371 ..
1372 } => {
1373 parse_quote!((#key_type, #value_type))
1374 }
1375 CollectionKind::Stream { element_type, .. }
1376 | CollectionKind::Singleton { element_type, .. }
1377 | CollectionKind::Optional { element_type, .. } => {
1378 parse_quote!(#element_type)
1379 }
1380 };
1381
1382 let cycle_id_ident = cycle_id.as_ident();
1383 graph_builders
1384 .get_dfir_mut(&input.metadata().location_id)
1385 .add_dfir(
1386 parse_quote! {
1387 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1388 },
1389 None,
1390 None,
1391 );
1392 }
1393 BuildersOrCallback::Callback(_, _) => {}
1395 }
1396 }
1397
1398 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1399 let input_ident =
1400 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1401
1402 match builders_or_callback {
1403 BuildersOrCallback::Builders(graph_builders) => {
1404 graph_builders
1405 .get_dfir_mut(&input.metadata().location_id)
1406 .add_dfir(
1407 parse_quote! {
1408 #input_ident -> for_each(&mut #ident);
1409 },
1410 None,
1411 Some(&next_stmt_id.to_string()),
1412 );
1413 }
1414 BuildersOrCallback::Callback(leaf_callback, _) => {
1415 leaf_callback(self, next_stmt_id);
1416 }
1417 }
1418
1419 *next_stmt_id += 1;
1420 }
1421
1422 HydroRoot::Null { input, .. } => {
1423 let input_ident =
1424 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1425
1426 match builders_or_callback {
1427 BuildersOrCallback::Builders(graph_builders) => {
1428 graph_builders
1429 .get_dfir_mut(&input.metadata().location_id)
1430 .add_dfir(
1431 parse_quote! {
1432 #input_ident -> for_each(|_| {});
1433 },
1434 None,
1435 Some(&next_stmt_id.to_string()),
1436 );
1437 }
1438 BuildersOrCallback::Callback(leaf_callback, _) => {
1439 leaf_callback(self, next_stmt_id);
1440 }
1441 }
1442
1443 *next_stmt_id += 1;
1444 }
1445 }
1446 }
1447
1448 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1449 match self {
1450 HydroRoot::ForEach { op_metadata, .. }
1451 | HydroRoot::SendExternal { op_metadata, .. }
1452 | HydroRoot::DestSink { op_metadata, .. }
1453 | HydroRoot::CycleSink { op_metadata, .. }
1454 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1455 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1456 }
1457 }
1458
1459 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1460 match self {
1461 HydroRoot::ForEach { op_metadata, .. }
1462 | HydroRoot::SendExternal { op_metadata, .. }
1463 | HydroRoot::DestSink { op_metadata, .. }
1464 | HydroRoot::CycleSink { op_metadata, .. }
1465 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1466 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1467 }
1468 }
1469
1470 pub fn input(&self) -> &HydroNode {
1471 match self {
1472 HydroRoot::ForEach { input, .. }
1473 | HydroRoot::SendExternal { input, .. }
1474 | HydroRoot::DestSink { input, .. }
1475 | HydroRoot::CycleSink { input, .. }
1476 | HydroRoot::EmbeddedOutput { input, .. }
1477 | HydroRoot::Null { input, .. } => input,
1478 }
1479 }
1480
1481 pub fn input_metadata(&self) -> &HydroIrMetadata {
1482 self.input().metadata()
1483 }
1484
1485 pub fn print_root(&self) -> String {
1486 match self {
1487 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1488 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1489 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1490 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1491 HydroRoot::EmbeddedOutput { ident, .. } => {
1492 format!("EmbeddedOutput({})", ident)
1493 }
1494 HydroRoot::Null { .. } => "Null".to_owned(),
1495 }
1496 }
1497
1498 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1499 match self {
1500 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1501 transform(f);
1502 }
1503 HydroRoot::SendExternal { .. }
1504 | HydroRoot::CycleSink { .. }
1505 | HydroRoot::EmbeddedOutput { .. }
1506 | HydroRoot::Null { .. } => {}
1507 }
1508 }
1509}
1510
1511#[cfg(feature = "build")]
1512fn tick_of(loc: &LocationId) -> Option<ClockId> {
1513 match loc {
1514 LocationId::Tick(id, _) => Some(*id),
1515 LocationId::Atomic(inner) => tick_of(inner),
1516 _ => None,
1517 }
1518}
1519
1520#[cfg(feature = "build")]
1521fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1522 match loc {
1523 LocationId::Tick(id, inner) => {
1524 *id = uf_find(uf, *id);
1525 remap_location(inner, uf);
1526 }
1527 LocationId::Atomic(inner) => {
1528 remap_location(inner, uf);
1529 }
1530 LocationId::Process(_) | LocationId::Cluster(_) => {}
1531 }
1532}
1533
1534#[cfg(feature = "build")]
1535fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1536 let p = *parent.get(&x).unwrap_or(&x);
1537 if p == x {
1538 return x;
1539 }
1540 let root = uf_find(parent, p);
1541 parent.insert(x, root);
1542 root
1543}
1544
1545#[cfg(feature = "build")]
1546fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1547 let ra = uf_find(parent, a);
1548 let rb = uf_find(parent, b);
1549 if ra != rb {
1550 parent.insert(ra, rb);
1551 }
1552}
1553
1554#[cfg(feature = "build")]
1558pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1559 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1560
1561 transform_bottom_up(
1563 ir,
1564 &mut |_| {},
1565 &mut |node: &mut HydroNode| {
1566 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1567 node
1568 && let (Some(a), Some(b)) = (
1569 tick_of(&inner.metadata().location_id),
1570 tick_of(&metadata.location_id),
1571 )
1572 {
1573 uf_union(&mut uf, a, b);
1574 }
1575 },
1576 false,
1577 );
1578
1579 transform_bottom_up(
1581 ir,
1582 &mut |_| {},
1583 &mut |node: &mut HydroNode| {
1584 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1585 },
1586 false,
1587 );
1588}
1589
1590#[cfg(feature = "build")]
1591pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1592 let mut builders = SecondaryMap::new();
1593 let mut seen_tees = HashMap::new();
1594 let mut built_tees = HashMap::new();
1595 let mut next_stmt_id = 0;
1596 for leaf in ir {
1597 leaf.emit(
1598 &mut builders,
1599 &mut seen_tees,
1600 &mut built_tees,
1601 &mut next_stmt_id,
1602 );
1603 }
1604 builders
1605}
1606
1607#[cfg(feature = "build")]
1608pub fn traverse_dfir(
1609 ir: &mut [HydroRoot],
1610 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1611 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1612) {
1613 let mut seen_tees = HashMap::new();
1614 let mut built_tees = HashMap::new();
1615 let mut next_stmt_id = 0;
1616 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1617 ir.iter_mut().for_each(|leaf| {
1618 leaf.emit_core(
1619 &mut callback,
1620 &mut seen_tees,
1621 &mut built_tees,
1622 &mut next_stmt_id,
1623 );
1624 });
1625}
1626
1627pub fn transform_bottom_up(
1628 ir: &mut [HydroRoot],
1629 transform_root: &mut impl FnMut(&mut HydroRoot),
1630 transform_node: &mut impl FnMut(&mut HydroNode),
1631 check_well_formed: bool,
1632) {
1633 let mut seen_tees = HashMap::new();
1634 ir.iter_mut().for_each(|leaf| {
1635 leaf.transform_bottom_up(
1636 transform_root,
1637 transform_node,
1638 &mut seen_tees,
1639 check_well_formed,
1640 );
1641 });
1642}
1643
1644pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1645 let mut seen_tees = HashMap::new();
1646 ir.iter()
1647 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1648 .collect()
1649}
1650
1651type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1652thread_local! {
1653 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1654 static SERIALIZED_SHARED: PrintedTees
1658 = const { RefCell::new(None) };
1659}
1660
1661pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1662 PRINTED_TEES.with(|printed_tees| {
1663 let mut printed_tees_mut = printed_tees.borrow_mut();
1664 *printed_tees_mut = Some((0, HashMap::new()));
1665 drop(printed_tees_mut);
1666
1667 let ret = f();
1668
1669 let mut printed_tees_mut = printed_tees.borrow_mut();
1670 *printed_tees_mut = None;
1671
1672 ret
1673 })
1674}
1675
1676pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1681 let _guard = SerializedSharedGuard::enter();
1682 f()
1683}
1684
1685struct SerializedSharedGuard {
1688 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1689}
1690
1691impl SerializedSharedGuard {
1692 fn enter() -> Self {
1693 let previous = SERIALIZED_SHARED.with(|cell| {
1694 let mut guard = cell.borrow_mut();
1695 guard.replace((0, HashMap::new()))
1696 });
1697 Self { previous }
1698 }
1699}
1700
1701impl Drop for SerializedSharedGuard {
1702 fn drop(&mut self) {
1703 SERIALIZED_SHARED.with(|cell| {
1704 *cell.borrow_mut() = self.previous.take();
1705 });
1706 }
1707}
1708
1709pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1710
1711impl serde::Serialize for SharedNode {
1712 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1723 SERIALIZED_SHARED.with(|cell| {
1724 let mut guard = cell.borrow_mut();
1725 let state = guard.as_mut().ok_or_else(|| {
1727 serde::ser::Error::custom(
1728 "SharedNode serialization requires an active serialize_dedup_shared scope",
1729 )
1730 })?;
1731 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1732
1733 if let Some(&id) = state.1.get(&ptr) {
1734 drop(guard);
1735 use serde::ser::SerializeMap;
1736 let mut map = serializer.serialize_map(Some(1))?;
1737 map.serialize_entry("$shared_ref", &id)?;
1738 map.end()
1739 } else {
1740 let id = state.0;
1741 state.0 += 1;
1742 state.1.insert(ptr, id);
1743 drop(guard);
1744
1745 use serde::ser::SerializeMap;
1746 let mut map = serializer.serialize_map(Some(2))?;
1747 map.serialize_entry("$shared", &id)?;
1748 map.serialize_entry("node", &*self.0.borrow())?;
1749 map.end()
1750 }
1751 })
1752 }
1753}
1754
1755impl SharedNode {
1756 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1757 Rc::as_ptr(&self.0)
1758 }
1759}
1760
1761impl Debug for SharedNode {
1762 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1763 PRINTED_TEES.with(|printed_tees| {
1764 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1765 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1766
1767 if let Some(printed_tees_mut) = printed_tees_mut {
1768 if let Some(existing) = printed_tees_mut
1769 .1
1770 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1771 {
1772 write!(f, "<shared {}>", existing)
1773 } else {
1774 let next_id = printed_tees_mut.0;
1775 printed_tees_mut.0 += 1;
1776 printed_tees_mut
1777 .1
1778 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1779 drop(printed_tees_mut_borrow);
1780 write!(f, "<shared {}>: ", next_id)?;
1781 Debug::fmt(&self.0.borrow(), f)
1782 }
1783 } else {
1784 drop(printed_tees_mut_borrow);
1785 write!(f, "<shared>: ")?;
1786 Debug::fmt(&self.0.borrow(), f)
1787 }
1788 })
1789 }
1790}
1791
1792impl Hash for SharedNode {
1793 fn hash<H: Hasher>(&self, state: &mut H) {
1794 self.0.borrow_mut().hash(state);
1795 }
1796}
1797
1798#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1799pub enum BoundKind {
1800 Unbounded,
1801 Bounded,
1802}
1803
1804#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1805pub enum StreamOrder {
1806 NoOrder,
1807 TotalOrder,
1808}
1809
1810#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1811pub enum StreamRetry {
1812 AtLeastOnce,
1813 ExactlyOnce,
1814}
1815
1816#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1817pub enum KeyedSingletonBoundKind {
1818 Unbounded,
1819 MonotonicValue,
1820 BoundedValue,
1821 Bounded,
1822}
1823
1824#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1825pub enum SingletonBoundKind {
1826 Unbounded,
1827 Monotonic,
1828 Bounded,
1829}
1830
1831#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1832pub enum CollectionKind {
1833 Stream {
1834 bound: BoundKind,
1835 order: StreamOrder,
1836 retry: StreamRetry,
1837 element_type: DebugType,
1838 },
1839 Singleton {
1840 bound: SingletonBoundKind,
1841 element_type: DebugType,
1842 },
1843 Optional {
1844 bound: BoundKind,
1845 element_type: DebugType,
1846 },
1847 KeyedStream {
1848 bound: BoundKind,
1849 value_order: StreamOrder,
1850 value_retry: StreamRetry,
1851 key_type: DebugType,
1852 value_type: DebugType,
1853 },
1854 KeyedSingleton {
1855 bound: KeyedSingletonBoundKind,
1856 key_type: DebugType,
1857 value_type: DebugType,
1858 },
1859}
1860
1861impl CollectionKind {
1862 pub fn is_bounded(&self) -> bool {
1863 matches!(
1864 self,
1865 CollectionKind::Stream {
1866 bound: BoundKind::Bounded,
1867 ..
1868 } | CollectionKind::Singleton {
1869 bound: SingletonBoundKind::Bounded,
1870 ..
1871 } | CollectionKind::Optional {
1872 bound: BoundKind::Bounded,
1873 ..
1874 } | CollectionKind::KeyedStream {
1875 bound: BoundKind::Bounded,
1876 ..
1877 } | CollectionKind::KeyedSingleton {
1878 bound: KeyedSingletonBoundKind::Bounded,
1879 ..
1880 }
1881 )
1882 }
1883}
1884
1885#[derive(Clone, serde::Serialize)]
1886pub struct HydroIrMetadata {
1887 pub location_id: LocationId,
1888 pub collection_kind: CollectionKind,
1889 pub cardinality: Option<usize>,
1890 pub tag: Option<String>,
1891 pub op: HydroIrOpMetadata,
1892}
1893
1894impl Hash for HydroIrMetadata {
1896 fn hash<H: Hasher>(&self, _: &mut H) {}
1897}
1898
1899impl PartialEq for HydroIrMetadata {
1900 fn eq(&self, _: &Self) -> bool {
1901 true
1902 }
1903}
1904
1905impl Eq for HydroIrMetadata {}
1906
1907impl Debug for HydroIrMetadata {
1908 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1909 f.debug_struct("HydroIrMetadata")
1910 .field("location_id", &self.location_id)
1911 .field("collection_kind", &self.collection_kind)
1912 .finish()
1913 }
1914}
1915
1916#[derive(Clone, serde::Serialize)]
1919pub struct HydroIrOpMetadata {
1920 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
1921 pub backtrace: Backtrace,
1922 pub cpu_usage: Option<f64>,
1923 pub network_recv_cpu_usage: Option<f64>,
1924 pub id: Option<usize>,
1925}
1926
1927impl HydroIrOpMetadata {
1928 #[expect(
1929 clippy::new_without_default,
1930 reason = "explicit calls to new ensure correct backtrace bounds"
1931 )]
1932 pub fn new() -> HydroIrOpMetadata {
1933 Self::new_with_skip(1)
1934 }
1935
1936 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1937 HydroIrOpMetadata {
1938 backtrace: Backtrace::get_backtrace(2 + skip_count),
1939 cpu_usage: None,
1940 network_recv_cpu_usage: None,
1941 id: None,
1942 }
1943 }
1944}
1945
1946impl Debug for HydroIrOpMetadata {
1947 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1948 f.debug_struct("HydroIrOpMetadata").finish()
1949 }
1950}
1951
1952impl Hash for HydroIrOpMetadata {
1953 fn hash<H: Hasher>(&self, _: &mut H) {}
1954}
1955
1956#[derive(Debug, Hash, serde::Serialize)]
1959pub enum HydroNode {
1960 Placeholder,
1961
1962 Cast {
1970 inner: Box<HydroNode>,
1971 metadata: HydroIrMetadata,
1972 },
1973
1974 ObserveNonDet {
1980 inner: Box<HydroNode>,
1981 trusted: bool, metadata: HydroIrMetadata,
1983 },
1984
1985 Source {
1986 source: HydroSource,
1987 metadata: HydroIrMetadata,
1988 },
1989
1990 SingletonSource {
1991 value: DebugExpr,
1992 first_tick_only: bool,
1993 metadata: HydroIrMetadata,
1994 },
1995
1996 CycleSource {
1997 cycle_id: CycleId,
1998 metadata: HydroIrMetadata,
1999 },
2000
2001 Tee {
2002 inner: SharedNode,
2003 metadata: HydroIrMetadata,
2004 },
2005
2006 Partition {
2007 inner: SharedNode,
2008 f: DebugExpr,
2009 is_true: bool,
2010 metadata: HydroIrMetadata,
2011 },
2012
2013 BeginAtomic {
2014 inner: Box<HydroNode>,
2015 metadata: HydroIrMetadata,
2016 },
2017
2018 EndAtomic {
2019 inner: Box<HydroNode>,
2020 metadata: HydroIrMetadata,
2021 },
2022
2023 Batch {
2024 inner: Box<HydroNode>,
2025 metadata: HydroIrMetadata,
2026 },
2027
2028 YieldConcat {
2029 inner: Box<HydroNode>,
2030 metadata: HydroIrMetadata,
2031 },
2032
2033 Chain {
2034 first: Box<HydroNode>,
2035 second: Box<HydroNode>,
2036 metadata: HydroIrMetadata,
2037 },
2038
2039 ChainFirst {
2040 first: Box<HydroNode>,
2041 second: Box<HydroNode>,
2042 metadata: HydroIrMetadata,
2043 },
2044
2045 CrossProduct {
2046 left: Box<HydroNode>,
2047 right: Box<HydroNode>,
2048 metadata: HydroIrMetadata,
2049 },
2050
2051 CrossSingleton {
2052 left: Box<HydroNode>,
2053 right: Box<HydroNode>,
2054 metadata: HydroIrMetadata,
2055 },
2056
2057 Join {
2058 left: Box<HydroNode>,
2059 right: Box<HydroNode>,
2060 metadata: HydroIrMetadata,
2061 },
2062
2063 JoinHalf {
2067 left: Box<HydroNode>,
2068 right: Box<HydroNode>,
2069 metadata: HydroIrMetadata,
2070 },
2071
2072 Difference {
2073 pos: Box<HydroNode>,
2074 neg: Box<HydroNode>,
2075 metadata: HydroIrMetadata,
2076 },
2077
2078 AntiJoin {
2079 pos: Box<HydroNode>,
2080 neg: Box<HydroNode>,
2081 metadata: HydroIrMetadata,
2082 },
2083
2084 ResolveFutures {
2085 input: Box<HydroNode>,
2086 metadata: HydroIrMetadata,
2087 },
2088 ResolveFuturesBlocking {
2089 input: Box<HydroNode>,
2090 metadata: HydroIrMetadata,
2091 },
2092 ResolveFuturesOrdered {
2093 input: Box<HydroNode>,
2094 metadata: HydroIrMetadata,
2095 },
2096
2097 Map {
2098 f: DebugExpr,
2099 input: Box<HydroNode>,
2100 metadata: HydroIrMetadata,
2101 },
2102 FlatMap {
2103 f: DebugExpr,
2104 input: Box<HydroNode>,
2105 metadata: HydroIrMetadata,
2106 },
2107 FlatMapStreamBlocking {
2108 f: DebugExpr,
2109 input: Box<HydroNode>,
2110 metadata: HydroIrMetadata,
2111 },
2112 Filter {
2113 f: DebugExpr,
2114 input: Box<HydroNode>,
2115 metadata: HydroIrMetadata,
2116 },
2117 FilterMap {
2118 f: DebugExpr,
2119 input: Box<HydroNode>,
2120 metadata: HydroIrMetadata,
2121 },
2122
2123 DeferTick {
2124 input: Box<HydroNode>,
2125 metadata: HydroIrMetadata,
2126 },
2127 Enumerate {
2128 input: Box<HydroNode>,
2129 metadata: HydroIrMetadata,
2130 },
2131 Inspect {
2132 f: DebugExpr,
2133 input: Box<HydroNode>,
2134 metadata: HydroIrMetadata,
2135 },
2136
2137 Unique {
2138 input: Box<HydroNode>,
2139 metadata: HydroIrMetadata,
2140 },
2141
2142 Sort {
2143 input: Box<HydroNode>,
2144 metadata: HydroIrMetadata,
2145 },
2146 Fold {
2147 init: DebugExpr,
2148 acc: DebugExpr,
2149 input: Box<HydroNode>,
2150 metadata: HydroIrMetadata,
2151 },
2152
2153 Scan {
2154 init: DebugExpr,
2155 acc: DebugExpr,
2156 input: Box<HydroNode>,
2157 metadata: HydroIrMetadata,
2158 },
2159 ScanAsyncBlocking {
2160 init: DebugExpr,
2161 acc: DebugExpr,
2162 input: Box<HydroNode>,
2163 metadata: HydroIrMetadata,
2164 },
2165 FoldKeyed {
2166 init: DebugExpr,
2167 acc: DebugExpr,
2168 input: Box<HydroNode>,
2169 metadata: HydroIrMetadata,
2170 },
2171
2172 Reduce {
2173 f: DebugExpr,
2174 input: Box<HydroNode>,
2175 metadata: HydroIrMetadata,
2176 },
2177 ReduceKeyed {
2178 f: DebugExpr,
2179 input: Box<HydroNode>,
2180 metadata: HydroIrMetadata,
2181 },
2182 ReduceKeyedWatermark {
2183 f: DebugExpr,
2184 input: Box<HydroNode>,
2185 watermark: Box<HydroNode>,
2186 metadata: HydroIrMetadata,
2187 },
2188
2189 Network {
2190 name: Option<String>,
2191 networking_info: crate::networking::NetworkingInfo,
2192 serialize_fn: Option<DebugExpr>,
2193 instantiate_fn: DebugInstantiate,
2194 deserialize_fn: Option<DebugExpr>,
2195 input: Box<HydroNode>,
2196 metadata: HydroIrMetadata,
2197 },
2198
2199 ExternalInput {
2200 from_external_key: LocationKey,
2201 from_port_id: ExternalPortId,
2202 from_many: bool,
2203 codec_type: DebugType,
2204 #[serde(skip)]
2205 port_hint: NetworkHint,
2206 instantiate_fn: DebugInstantiate,
2207 deserialize_fn: Option<DebugExpr>,
2208 metadata: HydroIrMetadata,
2209 },
2210
2211 Counter {
2212 tag: String,
2213 duration: DebugExpr,
2214 prefix: String,
2215 input: Box<HydroNode>,
2216 metadata: HydroIrMetadata,
2217 },
2218}
2219
2220pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2221pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2222
2223impl HydroNode {
2224 pub fn transform_bottom_up(
2225 &mut self,
2226 transform: &mut impl FnMut(&mut HydroNode),
2227 seen_tees: &mut SeenSharedNodes,
2228 check_well_formed: bool,
2229 ) {
2230 self.transform_children(
2231 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2232 seen_tees,
2233 );
2234
2235 transform(self);
2236
2237 let self_location = self.metadata().location_id.root();
2238
2239 if check_well_formed {
2240 match &*self {
2241 HydroNode::Network { .. } => {}
2242 _ => {
2243 self.input_metadata().iter().for_each(|i| {
2244 if i.location_id.root() != self_location {
2245 panic!(
2246 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2247 i,
2248 i.location_id.root(),
2249 self,
2250 self_location
2251 )
2252 }
2253 });
2254 }
2255 }
2256 }
2257 }
2258
2259 #[inline(always)]
2260 pub fn transform_children(
2261 &mut self,
2262 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2263 seen_tees: &mut SeenSharedNodes,
2264 ) {
2265 match self {
2266 HydroNode::Placeholder => {
2267 panic!();
2268 }
2269
2270 HydroNode::Source { .. }
2271 | HydroNode::SingletonSource { .. }
2272 | HydroNode::CycleSource { .. }
2273 | HydroNode::ExternalInput { .. } => {}
2274
2275 HydroNode::Tee { inner, .. } => {
2276 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2277 *inner = SharedNode(transformed.clone());
2278 } else {
2279 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2280 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2281 let mut orig = inner.0.replace(HydroNode::Placeholder);
2282 transform(&mut orig, seen_tees);
2283 *transformed_cell.borrow_mut() = orig;
2284 *inner = SharedNode(transformed_cell);
2285 }
2286 }
2287
2288 HydroNode::Partition { inner, .. } => {
2289 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2290 *inner = SharedNode(transformed.clone());
2291 } else {
2292 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2293 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2294 let mut orig = inner.0.replace(HydroNode::Placeholder);
2295 transform(&mut orig, seen_tees);
2296 *transformed_cell.borrow_mut() = orig;
2297 *inner = SharedNode(transformed_cell);
2298 }
2299 }
2300
2301 HydroNode::Cast { inner, .. }
2302 | HydroNode::ObserveNonDet { inner, .. }
2303 | HydroNode::BeginAtomic { inner, .. }
2304 | HydroNode::EndAtomic { inner, .. }
2305 | HydroNode::Batch { inner, .. }
2306 | HydroNode::YieldConcat { inner, .. } => {
2307 transform(inner.as_mut(), seen_tees);
2308 }
2309
2310 HydroNode::Chain { first, second, .. } => {
2311 transform(first.as_mut(), seen_tees);
2312 transform(second.as_mut(), seen_tees);
2313 }
2314
2315 HydroNode::ChainFirst { first, second, .. } => {
2316 transform(first.as_mut(), seen_tees);
2317 transform(second.as_mut(), seen_tees);
2318 }
2319
2320 HydroNode::CrossSingleton { left, right, .. }
2321 | HydroNode::CrossProduct { left, right, .. }
2322 | HydroNode::Join { left, right, .. }
2323 | HydroNode::JoinHalf { left, right, .. } => {
2324 transform(left.as_mut(), seen_tees);
2325 transform(right.as_mut(), seen_tees);
2326 }
2327
2328 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2329 transform(pos.as_mut(), seen_tees);
2330 transform(neg.as_mut(), seen_tees);
2331 }
2332
2333 HydroNode::ReduceKeyedWatermark {
2334 input, watermark, ..
2335 } => {
2336 transform(input.as_mut(), seen_tees);
2337 transform(watermark.as_mut(), seen_tees);
2338 }
2339
2340 HydroNode::Map { input, .. }
2341 | HydroNode::ResolveFutures { input, .. }
2342 | HydroNode::ResolveFuturesBlocking { input, .. }
2343 | HydroNode::ResolveFuturesOrdered { input, .. }
2344 | HydroNode::FlatMap { input, .. }
2345 | HydroNode::FlatMapStreamBlocking { input, .. }
2346 | HydroNode::Filter { input, .. }
2347 | HydroNode::FilterMap { input, .. }
2348 | HydroNode::Sort { input, .. }
2349 | HydroNode::DeferTick { input, .. }
2350 | HydroNode::Enumerate { input, .. }
2351 | HydroNode::Inspect { input, .. }
2352 | HydroNode::Unique { input, .. }
2353 | HydroNode::Network { input, .. }
2354 | HydroNode::Fold { input, .. }
2355 | HydroNode::Scan { input, .. }
2356 | HydroNode::ScanAsyncBlocking { input, .. }
2357 | HydroNode::FoldKeyed { input, .. }
2358 | HydroNode::Reduce { input, .. }
2359 | HydroNode::ReduceKeyed { input, .. }
2360 | HydroNode::Counter { input, .. } => {
2361 transform(input.as_mut(), seen_tees);
2362 }
2363 }
2364 }
2365
2366 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2367 match self {
2368 HydroNode::Placeholder => HydroNode::Placeholder,
2369 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2370 inner: Box::new(inner.deep_clone(seen_tees)),
2371 metadata: metadata.clone(),
2372 },
2373 HydroNode::ObserveNonDet {
2374 inner,
2375 trusted,
2376 metadata,
2377 } => HydroNode::ObserveNonDet {
2378 inner: Box::new(inner.deep_clone(seen_tees)),
2379 trusted: *trusted,
2380 metadata: metadata.clone(),
2381 },
2382 HydroNode::Source { source, metadata } => HydroNode::Source {
2383 source: source.clone(),
2384 metadata: metadata.clone(),
2385 },
2386 HydroNode::SingletonSource {
2387 value,
2388 first_tick_only,
2389 metadata,
2390 } => HydroNode::SingletonSource {
2391 value: value.clone(),
2392 first_tick_only: *first_tick_only,
2393 metadata: metadata.clone(),
2394 },
2395 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2396 cycle_id: *cycle_id,
2397 metadata: metadata.clone(),
2398 },
2399 HydroNode::Tee { inner, metadata } => {
2400 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2401 HydroNode::Tee {
2402 inner: SharedNode(transformed.clone()),
2403 metadata: metadata.clone(),
2404 }
2405 } else {
2406 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2407 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2408 let cloned = inner.0.borrow().deep_clone(seen_tees);
2409 *new_rc.borrow_mut() = cloned;
2410 HydroNode::Tee {
2411 inner: SharedNode(new_rc),
2412 metadata: metadata.clone(),
2413 }
2414 }
2415 }
2416 HydroNode::Partition {
2417 inner,
2418 f,
2419 is_true,
2420 metadata,
2421 } => {
2422 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2423 HydroNode::Partition {
2424 inner: SharedNode(transformed.clone()),
2425 f: f.clone(),
2426 is_true: *is_true,
2427 metadata: metadata.clone(),
2428 }
2429 } else {
2430 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2431 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2432 let cloned = inner.0.borrow().deep_clone(seen_tees);
2433 *new_rc.borrow_mut() = cloned;
2434 HydroNode::Partition {
2435 inner: SharedNode(new_rc),
2436 f: f.clone(),
2437 is_true: *is_true,
2438 metadata: metadata.clone(),
2439 }
2440 }
2441 }
2442 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2443 inner: Box::new(inner.deep_clone(seen_tees)),
2444 metadata: metadata.clone(),
2445 },
2446 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2447 inner: Box::new(inner.deep_clone(seen_tees)),
2448 metadata: metadata.clone(),
2449 },
2450 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2451 inner: Box::new(inner.deep_clone(seen_tees)),
2452 metadata: metadata.clone(),
2453 },
2454 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2455 inner: Box::new(inner.deep_clone(seen_tees)),
2456 metadata: metadata.clone(),
2457 },
2458 HydroNode::Chain {
2459 first,
2460 second,
2461 metadata,
2462 } => HydroNode::Chain {
2463 first: Box::new(first.deep_clone(seen_tees)),
2464 second: Box::new(second.deep_clone(seen_tees)),
2465 metadata: metadata.clone(),
2466 },
2467 HydroNode::ChainFirst {
2468 first,
2469 second,
2470 metadata,
2471 } => HydroNode::ChainFirst {
2472 first: Box::new(first.deep_clone(seen_tees)),
2473 second: Box::new(second.deep_clone(seen_tees)),
2474 metadata: metadata.clone(),
2475 },
2476 HydroNode::CrossProduct {
2477 left,
2478 right,
2479 metadata,
2480 } => HydroNode::CrossProduct {
2481 left: Box::new(left.deep_clone(seen_tees)),
2482 right: Box::new(right.deep_clone(seen_tees)),
2483 metadata: metadata.clone(),
2484 },
2485 HydroNode::CrossSingleton {
2486 left,
2487 right,
2488 metadata,
2489 } => HydroNode::CrossSingleton {
2490 left: Box::new(left.deep_clone(seen_tees)),
2491 right: Box::new(right.deep_clone(seen_tees)),
2492 metadata: metadata.clone(),
2493 },
2494 HydroNode::Join {
2495 left,
2496 right,
2497 metadata,
2498 } => HydroNode::Join {
2499 left: Box::new(left.deep_clone(seen_tees)),
2500 right: Box::new(right.deep_clone(seen_tees)),
2501 metadata: metadata.clone(),
2502 },
2503 HydroNode::JoinHalf {
2504 left,
2505 right,
2506 metadata,
2507 } => HydroNode::JoinHalf {
2508 left: Box::new(left.deep_clone(seen_tees)),
2509 right: Box::new(right.deep_clone(seen_tees)),
2510 metadata: metadata.clone(),
2511 },
2512 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2513 pos: Box::new(pos.deep_clone(seen_tees)),
2514 neg: Box::new(neg.deep_clone(seen_tees)),
2515 metadata: metadata.clone(),
2516 },
2517 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2518 pos: Box::new(pos.deep_clone(seen_tees)),
2519 neg: Box::new(neg.deep_clone(seen_tees)),
2520 metadata: metadata.clone(),
2521 },
2522 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2523 input: Box::new(input.deep_clone(seen_tees)),
2524 metadata: metadata.clone(),
2525 },
2526 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2527 HydroNode::ResolveFuturesBlocking {
2528 input: Box::new(input.deep_clone(seen_tees)),
2529 metadata: metadata.clone(),
2530 }
2531 }
2532 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2533 HydroNode::ResolveFuturesOrdered {
2534 input: Box::new(input.deep_clone(seen_tees)),
2535 metadata: metadata.clone(),
2536 }
2537 }
2538 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2539 f: f.clone(),
2540 input: Box::new(input.deep_clone(seen_tees)),
2541 metadata: metadata.clone(),
2542 },
2543 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2544 f: f.clone(),
2545 input: Box::new(input.deep_clone(seen_tees)),
2546 metadata: metadata.clone(),
2547 },
2548 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2549 HydroNode::FlatMapStreamBlocking {
2550 f: f.clone(),
2551 input: Box::new(input.deep_clone(seen_tees)),
2552 metadata: metadata.clone(),
2553 }
2554 }
2555 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2556 f: f.clone(),
2557 input: Box::new(input.deep_clone(seen_tees)),
2558 metadata: metadata.clone(),
2559 },
2560 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2561 f: f.clone(),
2562 input: Box::new(input.deep_clone(seen_tees)),
2563 metadata: metadata.clone(),
2564 },
2565 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2566 input: Box::new(input.deep_clone(seen_tees)),
2567 metadata: metadata.clone(),
2568 },
2569 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2570 input: Box::new(input.deep_clone(seen_tees)),
2571 metadata: metadata.clone(),
2572 },
2573 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2574 f: f.clone(),
2575 input: Box::new(input.deep_clone(seen_tees)),
2576 metadata: metadata.clone(),
2577 },
2578 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2579 input: Box::new(input.deep_clone(seen_tees)),
2580 metadata: metadata.clone(),
2581 },
2582 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2583 input: Box::new(input.deep_clone(seen_tees)),
2584 metadata: metadata.clone(),
2585 },
2586 HydroNode::Fold {
2587 init,
2588 acc,
2589 input,
2590 metadata,
2591 } => HydroNode::Fold {
2592 init: init.clone(),
2593 acc: acc.clone(),
2594 input: Box::new(input.deep_clone(seen_tees)),
2595 metadata: metadata.clone(),
2596 },
2597 HydroNode::Scan {
2598 init,
2599 acc,
2600 input,
2601 metadata,
2602 } => HydroNode::Scan {
2603 init: init.clone(),
2604 acc: acc.clone(),
2605 input: Box::new(input.deep_clone(seen_tees)),
2606 metadata: metadata.clone(),
2607 },
2608 HydroNode::ScanAsyncBlocking {
2609 init,
2610 acc,
2611 input,
2612 metadata,
2613 } => HydroNode::ScanAsyncBlocking {
2614 init: init.clone(),
2615 acc: acc.clone(),
2616 input: Box::new(input.deep_clone(seen_tees)),
2617 metadata: metadata.clone(),
2618 },
2619 HydroNode::FoldKeyed {
2620 init,
2621 acc,
2622 input,
2623 metadata,
2624 } => HydroNode::FoldKeyed {
2625 init: init.clone(),
2626 acc: acc.clone(),
2627 input: Box::new(input.deep_clone(seen_tees)),
2628 metadata: metadata.clone(),
2629 },
2630 HydroNode::ReduceKeyedWatermark {
2631 f,
2632 input,
2633 watermark,
2634 metadata,
2635 } => HydroNode::ReduceKeyedWatermark {
2636 f: f.clone(),
2637 input: Box::new(input.deep_clone(seen_tees)),
2638 watermark: Box::new(watermark.deep_clone(seen_tees)),
2639 metadata: metadata.clone(),
2640 },
2641 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2642 f: f.clone(),
2643 input: Box::new(input.deep_clone(seen_tees)),
2644 metadata: metadata.clone(),
2645 },
2646 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2647 f: f.clone(),
2648 input: Box::new(input.deep_clone(seen_tees)),
2649 metadata: metadata.clone(),
2650 },
2651 HydroNode::Network {
2652 name,
2653 networking_info,
2654 serialize_fn,
2655 instantiate_fn,
2656 deserialize_fn,
2657 input,
2658 metadata,
2659 } => HydroNode::Network {
2660 name: name.clone(),
2661 networking_info: networking_info.clone(),
2662 serialize_fn: serialize_fn.clone(),
2663 instantiate_fn: instantiate_fn.clone(),
2664 deserialize_fn: deserialize_fn.clone(),
2665 input: Box::new(input.deep_clone(seen_tees)),
2666 metadata: metadata.clone(),
2667 },
2668 HydroNode::ExternalInput {
2669 from_external_key,
2670 from_port_id,
2671 from_many,
2672 codec_type,
2673 port_hint,
2674 instantiate_fn,
2675 deserialize_fn,
2676 metadata,
2677 } => HydroNode::ExternalInput {
2678 from_external_key: *from_external_key,
2679 from_port_id: *from_port_id,
2680 from_many: *from_many,
2681 codec_type: codec_type.clone(),
2682 port_hint: *port_hint,
2683 instantiate_fn: instantiate_fn.clone(),
2684 deserialize_fn: deserialize_fn.clone(),
2685 metadata: metadata.clone(),
2686 },
2687 HydroNode::Counter {
2688 tag,
2689 duration,
2690 prefix,
2691 input,
2692 metadata,
2693 } => HydroNode::Counter {
2694 tag: tag.clone(),
2695 duration: duration.clone(),
2696 prefix: prefix.clone(),
2697 input: Box::new(input.deep_clone(seen_tees)),
2698 metadata: metadata.clone(),
2699 },
2700 }
2701 }
2702
2703 #[cfg(feature = "build")]
2704 pub fn emit_core(
2705 &mut self,
2706 builders_or_callback: &mut BuildersOrCallback<
2707 impl FnMut(&mut HydroRoot, &mut usize),
2708 impl FnMut(&mut HydroNode, &mut usize),
2709 >,
2710 seen_tees: &mut SeenSharedNodes,
2711 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2712 next_stmt_id: &mut usize,
2713 ) -> syn::Ident {
2714 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2715
2716 self.transform_bottom_up(
2717 &mut |node: &mut HydroNode| {
2718 let out_location = node.metadata().location_id.clone();
2719 match node {
2720 HydroNode::Placeholder => {
2721 panic!()
2722 }
2723
2724 HydroNode::Cast { .. } => {
2725 match builders_or_callback {
2728 BuildersOrCallback::Builders(_) => {}
2729 BuildersOrCallback::Callback(_, node_callback) => {
2730 node_callback(node, next_stmt_id);
2731 }
2732 }
2733
2734 *next_stmt_id += 1;
2735 }
2737
2738 HydroNode::ObserveNonDet {
2739 inner,
2740 trusted,
2741 metadata,
2742 ..
2743 } => {
2744 let inner_ident = ident_stack.pop().unwrap();
2745
2746 let observe_ident =
2747 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2748
2749 match builders_or_callback {
2750 BuildersOrCallback::Builders(graph_builders) => {
2751 graph_builders.observe_nondet(
2752 *trusted,
2753 &inner.metadata().location_id,
2754 inner_ident,
2755 &inner.metadata().collection_kind,
2756 &observe_ident,
2757 &metadata.collection_kind,
2758 &metadata.op,
2759 );
2760 }
2761 BuildersOrCallback::Callback(_, node_callback) => {
2762 node_callback(node, next_stmt_id);
2763 }
2764 }
2765
2766 *next_stmt_id += 1;
2767
2768 ident_stack.push(observe_ident);
2769 }
2770
2771 HydroNode::Batch {
2772 inner, metadata, ..
2773 } => {
2774 let inner_ident = ident_stack.pop().unwrap();
2775
2776 let batch_ident =
2777 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2778
2779 match builders_or_callback {
2780 BuildersOrCallback::Builders(graph_builders) => {
2781 graph_builders.batch(
2782 inner_ident,
2783 &inner.metadata().location_id,
2784 &inner.metadata().collection_kind,
2785 &batch_ident,
2786 &out_location,
2787 &metadata.op,
2788 );
2789 }
2790 BuildersOrCallback::Callback(_, node_callback) => {
2791 node_callback(node, next_stmt_id);
2792 }
2793 }
2794
2795 *next_stmt_id += 1;
2796
2797 ident_stack.push(batch_ident);
2798 }
2799
2800 HydroNode::YieldConcat { inner, .. } => {
2801 let inner_ident = ident_stack.pop().unwrap();
2802
2803 let yield_ident =
2804 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2805
2806 match builders_or_callback {
2807 BuildersOrCallback::Builders(graph_builders) => {
2808 graph_builders.yield_from_tick(
2809 inner_ident,
2810 &inner.metadata().location_id,
2811 &inner.metadata().collection_kind,
2812 &yield_ident,
2813 &out_location,
2814 );
2815 }
2816 BuildersOrCallback::Callback(_, node_callback) => {
2817 node_callback(node, next_stmt_id);
2818 }
2819 }
2820
2821 *next_stmt_id += 1;
2822
2823 ident_stack.push(yield_ident);
2824 }
2825
2826 HydroNode::BeginAtomic { inner, metadata } => {
2827 let inner_ident = ident_stack.pop().unwrap();
2828
2829 let begin_ident =
2830 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832 match builders_or_callback {
2833 BuildersOrCallback::Builders(graph_builders) => {
2834 graph_builders.begin_atomic(
2835 inner_ident,
2836 &inner.metadata().location_id,
2837 &inner.metadata().collection_kind,
2838 &begin_ident,
2839 &out_location,
2840 &metadata.op,
2841 );
2842 }
2843 BuildersOrCallback::Callback(_, node_callback) => {
2844 node_callback(node, next_stmt_id);
2845 }
2846 }
2847
2848 *next_stmt_id += 1;
2849
2850 ident_stack.push(begin_ident);
2851 }
2852
2853 HydroNode::EndAtomic { inner, .. } => {
2854 let inner_ident = ident_stack.pop().unwrap();
2855
2856 let end_ident =
2857 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2858
2859 match builders_or_callback {
2860 BuildersOrCallback::Builders(graph_builders) => {
2861 graph_builders.end_atomic(
2862 inner_ident,
2863 &inner.metadata().location_id,
2864 &inner.metadata().collection_kind,
2865 &end_ident,
2866 );
2867 }
2868 BuildersOrCallback::Callback(_, node_callback) => {
2869 node_callback(node, next_stmt_id);
2870 }
2871 }
2872
2873 *next_stmt_id += 1;
2874
2875 ident_stack.push(end_ident);
2876 }
2877
2878 HydroNode::Source {
2879 source, metadata, ..
2880 } => {
2881 if let HydroSource::ExternalNetwork() = source {
2882 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2883 } else {
2884 let source_ident =
2885 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2886
2887 let source_stmt = match source {
2888 HydroSource::Stream(expr) => {
2889 debug_assert!(metadata.location_id.is_top_level());
2890 parse_quote! {
2891 #source_ident = source_stream(#expr);
2892 }
2893 }
2894
2895 HydroSource::ExternalNetwork() => {
2896 unreachable!()
2897 }
2898
2899 HydroSource::Iter(expr) => {
2900 if metadata.location_id.is_top_level() {
2901 parse_quote! {
2902 #source_ident = source_iter(#expr);
2903 }
2904 } else {
2905 parse_quote! {
2907 #source_ident = source_iter(#expr) -> persist::<'static>();
2908 }
2909 }
2910 }
2911
2912 HydroSource::Spin() => {
2913 debug_assert!(metadata.location_id.is_top_level());
2914 parse_quote! {
2915 #source_ident = spin();
2916 }
2917 }
2918
2919 HydroSource::ClusterMembers(target_loc, state) => {
2920 debug_assert!(metadata.location_id.is_top_level());
2921
2922 let members_tee_ident = syn::Ident::new(
2923 &format!(
2924 "__cluster_members_tee_{}_{}",
2925 metadata.location_id.root().key(),
2926 target_loc.key(),
2927 ),
2928 Span::call_site(),
2929 );
2930
2931 match state {
2932 ClusterMembersState::Stream(d) => {
2933 parse_quote! {
2934 #members_tee_ident = source_stream(#d) -> tee();
2935 #source_ident = #members_tee_ident;
2936 }
2937 },
2938 ClusterMembersState::Uninit => syn::parse_quote! {
2939 #source_ident = source_stream(DUMMY);
2940 },
2941 ClusterMembersState::Tee(..) => parse_quote! {
2942 #source_ident = #members_tee_ident;
2943 },
2944 }
2945 }
2946
2947 HydroSource::Embedded(ident) => {
2948 parse_quote! {
2949 #source_ident = source_stream(#ident);
2950 }
2951 }
2952
2953 HydroSource::EmbeddedSingleton(ident) => {
2954 parse_quote! {
2955 #source_ident = source_iter([#ident]);
2956 }
2957 }
2958 };
2959
2960 match builders_or_callback {
2961 BuildersOrCallback::Builders(graph_builders) => {
2962 let builder = graph_builders.get_dfir_mut(&out_location);
2963 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2964 }
2965 BuildersOrCallback::Callback(_, node_callback) => {
2966 node_callback(node, next_stmt_id);
2967 }
2968 }
2969
2970 *next_stmt_id += 1;
2971
2972 ident_stack.push(source_ident);
2973 }
2974 }
2975
2976 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2977 let source_ident =
2978 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2979
2980 match builders_or_callback {
2981 BuildersOrCallback::Builders(graph_builders) => {
2982 let builder = graph_builders.get_dfir_mut(&out_location);
2983
2984 if *first_tick_only {
2985 assert!(
2986 !metadata.location_id.is_top_level(),
2987 "first_tick_only SingletonSource must be inside a tick"
2988 );
2989 }
2990
2991 if *first_tick_only
2992 || (metadata.location_id.is_top_level()
2993 && metadata.collection_kind.is_bounded())
2994 {
2995 builder.add_dfir(
2996 parse_quote! {
2997 #source_ident = source_iter([#value]);
2998 },
2999 None,
3000 Some(&next_stmt_id.to_string()),
3001 );
3002 } else {
3003 builder.add_dfir(
3004 parse_quote! {
3005 #source_ident = source_iter([#value]) -> persist::<'static>();
3006 },
3007 None,
3008 Some(&next_stmt_id.to_string()),
3009 );
3010 }
3011 }
3012 BuildersOrCallback::Callback(_, node_callback) => {
3013 node_callback(node, next_stmt_id);
3014 }
3015 }
3016
3017 *next_stmt_id += 1;
3018
3019 ident_stack.push(source_ident);
3020 }
3021
3022 HydroNode::CycleSource { cycle_id, .. } => {
3023 let ident = cycle_id.as_ident();
3024
3025 match builders_or_callback {
3026 BuildersOrCallback::Builders(_) => {}
3027 BuildersOrCallback::Callback(_, node_callback) => {
3028 node_callback(node, next_stmt_id);
3029 }
3030 }
3031
3032 *next_stmt_id += 1;
3034
3035 ident_stack.push(ident);
3036 }
3037
3038 HydroNode::Tee { inner, .. } => {
3039 let ret_ident = if let Some(built_idents) =
3040 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3041 {
3042 match builders_or_callback {
3043 BuildersOrCallback::Builders(_) => {}
3044 BuildersOrCallback::Callback(_, node_callback) => {
3045 node_callback(node, next_stmt_id);
3046 }
3047 }
3048
3049 built_idents[0].clone()
3050 } else {
3051 let inner_ident = ident_stack.pop().unwrap();
3054
3055 let tee_ident =
3056 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3057
3058 built_tees.insert(
3059 inner.0.as_ref() as *const RefCell<HydroNode>,
3060 vec![tee_ident.clone()],
3061 );
3062
3063 match builders_or_callback {
3064 BuildersOrCallback::Builders(graph_builders) => {
3065 let builder = graph_builders.get_dfir_mut(&out_location);
3066 builder.add_dfir(
3067 parse_quote! {
3068 #tee_ident = #inner_ident -> tee();
3069 },
3070 None,
3071 Some(&next_stmt_id.to_string()),
3072 );
3073 }
3074 BuildersOrCallback::Callback(_, node_callback) => {
3075 node_callback(node, next_stmt_id);
3076 }
3077 }
3078
3079 tee_ident
3080 };
3081
3082 *next_stmt_id += 1;
3086 ident_stack.push(ret_ident);
3087 }
3088
3089 HydroNode::Partition {
3090 inner, f, is_true, ..
3091 } => {
3092 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3094 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3095 match builders_or_callback {
3096 BuildersOrCallback::Builders(_) => {}
3097 BuildersOrCallback::Callback(_, node_callback) => {
3098 node_callback(node, next_stmt_id);
3099 }
3100 }
3101
3102 let idx = if is_true { 0 } else { 1 };
3103 built_idents[idx].clone()
3104 } else {
3105 let inner_ident = ident_stack.pop().unwrap();
3108
3109 let partition_ident = syn::Ident::new(
3110 &format!("stream_{}_partition", *next_stmt_id),
3111 Span::call_site(),
3112 );
3113 let true_ident = syn::Ident::new(
3114 &format!("stream_{}_true", *next_stmt_id),
3115 Span::call_site(),
3116 );
3117 let false_ident = syn::Ident::new(
3118 &format!("stream_{}_false", *next_stmt_id),
3119 Span::call_site(),
3120 );
3121
3122 built_tees.insert(
3123 ptr,
3124 vec![true_ident.clone(), false_ident.clone()],
3125 );
3126
3127 match builders_or_callback {
3128 BuildersOrCallback::Builders(graph_builders) => {
3129 let builder = graph_builders.get_dfir_mut(&out_location);
3130 builder.add_dfir(
3131 parse_quote! {
3132 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3133 #true_ident = #partition_ident[0];
3134 #false_ident = #partition_ident[1];
3135 },
3136 None,
3137 Some(&next_stmt_id.to_string()),
3138 );
3139 }
3140 BuildersOrCallback::Callback(_, node_callback) => {
3141 node_callback(node, next_stmt_id);
3142 }
3143 }
3144
3145 if is_true { true_ident } else { false_ident }
3146 };
3147
3148 *next_stmt_id += 1;
3149 ident_stack.push(ret_ident);
3150 }
3151
3152 HydroNode::Chain { .. } => {
3153 let second_ident = ident_stack.pop().unwrap();
3155 let first_ident = ident_stack.pop().unwrap();
3156
3157 let chain_ident =
3158 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3159
3160 match builders_or_callback {
3161 BuildersOrCallback::Builders(graph_builders) => {
3162 let builder = graph_builders.get_dfir_mut(&out_location);
3163 builder.add_dfir(
3164 parse_quote! {
3165 #chain_ident = chain();
3166 #first_ident -> [0]#chain_ident;
3167 #second_ident -> [1]#chain_ident;
3168 },
3169 None,
3170 Some(&next_stmt_id.to_string()),
3171 );
3172 }
3173 BuildersOrCallback::Callback(_, node_callback) => {
3174 node_callback(node, next_stmt_id);
3175 }
3176 }
3177
3178 *next_stmt_id += 1;
3179
3180 ident_stack.push(chain_ident);
3181 }
3182
3183 HydroNode::ChainFirst { .. } => {
3184 let second_ident = ident_stack.pop().unwrap();
3185 let first_ident = ident_stack.pop().unwrap();
3186
3187 let chain_ident =
3188 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3189
3190 match builders_or_callback {
3191 BuildersOrCallback::Builders(graph_builders) => {
3192 let builder = graph_builders.get_dfir_mut(&out_location);
3193 builder.add_dfir(
3194 parse_quote! {
3195 #chain_ident = chain_first_n(1);
3196 #first_ident -> [0]#chain_ident;
3197 #second_ident -> [1]#chain_ident;
3198 },
3199 None,
3200 Some(&next_stmt_id.to_string()),
3201 );
3202 }
3203 BuildersOrCallback::Callback(_, node_callback) => {
3204 node_callback(node, next_stmt_id);
3205 }
3206 }
3207
3208 *next_stmt_id += 1;
3209
3210 ident_stack.push(chain_ident);
3211 }
3212
3213 HydroNode::CrossSingleton { right, .. } => {
3214 let right_ident = ident_stack.pop().unwrap();
3215 let left_ident = ident_stack.pop().unwrap();
3216
3217 let cross_ident =
3218 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3219
3220 match builders_or_callback {
3221 BuildersOrCallback::Builders(graph_builders) => {
3222 let builder = graph_builders.get_dfir_mut(&out_location);
3223
3224 if right.metadata().location_id.is_top_level()
3225 && right.metadata().collection_kind.is_bounded()
3226 {
3227 builder.add_dfir(
3228 parse_quote! {
3229 #cross_ident = cross_singleton();
3230 #left_ident -> [input]#cross_ident;
3231 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3232 },
3233 None,
3234 Some(&next_stmt_id.to_string()),
3235 );
3236 } else {
3237 builder.add_dfir(
3238 parse_quote! {
3239 #cross_ident = cross_singleton();
3240 #left_ident -> [input]#cross_ident;
3241 #right_ident -> [single]#cross_ident;
3242 },
3243 None,
3244 Some(&next_stmt_id.to_string()),
3245 );
3246 }
3247 }
3248 BuildersOrCallback::Callback(_, node_callback) => {
3249 node_callback(node, next_stmt_id);
3250 }
3251 }
3252
3253 *next_stmt_id += 1;
3254
3255 ident_stack.push(cross_ident);
3256 }
3257
3258 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3259 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3260 parse_quote!(cross_join_multiset)
3261 } else {
3262 parse_quote!(join_multiset)
3263 };
3264
3265 let (HydroNode::CrossProduct { left, right, .. }
3266 | HydroNode::Join { left, right, .. }) = node
3267 else {
3268 unreachable!()
3269 };
3270
3271 let is_top_level = left.metadata().location_id.is_top_level()
3272 && right.metadata().location_id.is_top_level();
3273 let left_lifetime = if left.metadata().location_id.is_top_level() {
3274 quote!('static)
3275 } else {
3276 quote!('tick)
3277 };
3278
3279 let right_lifetime = if right.metadata().location_id.is_top_level() {
3280 quote!('static)
3281 } else {
3282 quote!('tick)
3283 };
3284
3285 let right_ident = ident_stack.pop().unwrap();
3286 let left_ident = ident_stack.pop().unwrap();
3287
3288 let stream_ident =
3289 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3290
3291 match builders_or_callback {
3292 BuildersOrCallback::Builders(graph_builders) => {
3293 let builder = graph_builders.get_dfir_mut(&out_location);
3294 builder.add_dfir(
3295 if is_top_level {
3296 parse_quote! {
3299 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3300 #left_ident -> [0]#stream_ident;
3301 #right_ident -> [1]#stream_ident;
3302 }
3303 } else {
3304 parse_quote! {
3305 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3306 #left_ident -> [0]#stream_ident;
3307 #right_ident -> [1]#stream_ident;
3308 }
3309 }
3310 ,
3311 None,
3312 Some(&next_stmt_id.to_string()),
3313 );
3314 }
3315 BuildersOrCallback::Callback(_, node_callback) => {
3316 node_callback(node, next_stmt_id);
3317 }
3318 }
3319
3320 *next_stmt_id += 1;
3321
3322 ident_stack.push(stream_ident);
3323 }
3324
3325 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3326 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3327 parse_quote!(difference)
3328 } else {
3329 parse_quote!(anti_join)
3330 };
3331
3332 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3333 node
3334 else {
3335 unreachable!()
3336 };
3337
3338 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3339 quote!('static)
3340 } else {
3341 quote!('tick)
3342 };
3343
3344 let neg_ident = ident_stack.pop().unwrap();
3345 let pos_ident = ident_stack.pop().unwrap();
3346
3347 let stream_ident =
3348 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3349
3350 match builders_or_callback {
3351 BuildersOrCallback::Builders(graph_builders) => {
3352 let builder = graph_builders.get_dfir_mut(&out_location);
3353 builder.add_dfir(
3354 parse_quote! {
3355 #stream_ident = #operator::<'tick, #neg_lifetime>();
3356 #pos_ident -> [pos]#stream_ident;
3357 #neg_ident -> [neg]#stream_ident;
3358 },
3359 None,
3360 Some(&next_stmt_id.to_string()),
3361 );
3362 }
3363 BuildersOrCallback::Callback(_, node_callback) => {
3364 node_callback(node, next_stmt_id);
3365 }
3366 }
3367
3368 *next_stmt_id += 1;
3369
3370 ident_stack.push(stream_ident);
3371 }
3372
3373 HydroNode::JoinHalf { .. } => {
3374 let HydroNode::JoinHalf { right, .. } = node else {
3375 unreachable!()
3376 };
3377
3378 assert!(
3379 right.metadata().collection_kind.is_bounded(),
3380 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3381 right.metadata().collection_kind
3382 );
3383
3384 let build_lifetime = if right.metadata().location_id.is_top_level() {
3385 quote!('static)
3386 } else {
3387 quote!('tick)
3388 };
3389
3390 let build_ident = ident_stack.pop().unwrap();
3391 let probe_ident = ident_stack.pop().unwrap();
3392
3393 let stream_ident =
3394 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3395
3396 match builders_or_callback {
3397 BuildersOrCallback::Builders(graph_builders) => {
3398 let builder = graph_builders.get_dfir_mut(&out_location);
3399 builder.add_dfir(
3400 parse_quote! {
3401 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3402 #probe_ident -> [probe]#stream_ident;
3403 #build_ident -> [build]#stream_ident;
3404 },
3405 None,
3406 Some(&next_stmt_id.to_string()),
3407 );
3408 }
3409 BuildersOrCallback::Callback(_, node_callback) => {
3410 node_callback(node, next_stmt_id);
3411 }
3412 }
3413
3414 *next_stmt_id += 1;
3415
3416 ident_stack.push(stream_ident);
3417 }
3418
3419 HydroNode::ResolveFutures { .. } => {
3420 let input_ident = ident_stack.pop().unwrap();
3421
3422 let futures_ident =
3423 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3424
3425 match builders_or_callback {
3426 BuildersOrCallback::Builders(graph_builders) => {
3427 let builder = graph_builders.get_dfir_mut(&out_location);
3428 builder.add_dfir(
3429 parse_quote! {
3430 #futures_ident = #input_ident -> resolve_futures();
3431 },
3432 None,
3433 Some(&next_stmt_id.to_string()),
3434 );
3435 }
3436 BuildersOrCallback::Callback(_, node_callback) => {
3437 node_callback(node, next_stmt_id);
3438 }
3439 }
3440
3441 *next_stmt_id += 1;
3442
3443 ident_stack.push(futures_ident);
3444 }
3445
3446 HydroNode::ResolveFuturesBlocking { .. } => {
3447 let input_ident = ident_stack.pop().unwrap();
3448
3449 let futures_ident =
3450 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3451
3452 match builders_or_callback {
3453 BuildersOrCallback::Builders(graph_builders) => {
3454 let builder = graph_builders.get_dfir_mut(&out_location);
3455 builder.add_dfir(
3456 parse_quote! {
3457 #futures_ident = #input_ident -> resolve_futures_blocking();
3458 },
3459 None,
3460 Some(&next_stmt_id.to_string()),
3461 );
3462 }
3463 BuildersOrCallback::Callback(_, node_callback) => {
3464 node_callback(node, next_stmt_id);
3465 }
3466 }
3467
3468 *next_stmt_id += 1;
3469
3470 ident_stack.push(futures_ident);
3471 }
3472
3473 HydroNode::ResolveFuturesOrdered { .. } => {
3474 let input_ident = ident_stack.pop().unwrap();
3475
3476 let futures_ident =
3477 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3478
3479 match builders_or_callback {
3480 BuildersOrCallback::Builders(graph_builders) => {
3481 let builder = graph_builders.get_dfir_mut(&out_location);
3482 builder.add_dfir(
3483 parse_quote! {
3484 #futures_ident = #input_ident -> resolve_futures_ordered();
3485 },
3486 None,
3487 Some(&next_stmt_id.to_string()),
3488 );
3489 }
3490 BuildersOrCallback::Callback(_, node_callback) => {
3491 node_callback(node, next_stmt_id);
3492 }
3493 }
3494
3495 *next_stmt_id += 1;
3496
3497 ident_stack.push(futures_ident);
3498 }
3499
3500 HydroNode::Map { f, .. } => {
3501 let input_ident = ident_stack.pop().unwrap();
3502
3503 let map_ident =
3504 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3505
3506 match builders_or_callback {
3507 BuildersOrCallback::Builders(graph_builders) => {
3508 let builder = graph_builders.get_dfir_mut(&out_location);
3509 builder.add_dfir(
3510 parse_quote! {
3511 #map_ident = #input_ident -> map(#f);
3512 },
3513 None,
3514 Some(&next_stmt_id.to_string()),
3515 );
3516 }
3517 BuildersOrCallback::Callback(_, node_callback) => {
3518 node_callback(node, next_stmt_id);
3519 }
3520 }
3521
3522 *next_stmt_id += 1;
3523
3524 ident_stack.push(map_ident);
3525 }
3526
3527 HydroNode::FlatMap { f, .. } => {
3528 let input_ident = ident_stack.pop().unwrap();
3529
3530 let flat_map_ident =
3531 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3532
3533 match builders_or_callback {
3534 BuildersOrCallback::Builders(graph_builders) => {
3535 let builder = graph_builders.get_dfir_mut(&out_location);
3536 builder.add_dfir(
3537 parse_quote! {
3538 #flat_map_ident = #input_ident -> flat_map(#f);
3539 },
3540 None,
3541 Some(&next_stmt_id.to_string()),
3542 );
3543 }
3544 BuildersOrCallback::Callback(_, node_callback) => {
3545 node_callback(node, next_stmt_id);
3546 }
3547 }
3548
3549 *next_stmt_id += 1;
3550
3551 ident_stack.push(flat_map_ident);
3552 }
3553
3554 HydroNode::FlatMapStreamBlocking { f, .. } => {
3555 let input_ident = ident_stack.pop().unwrap();
3556
3557 let flat_map_stream_blocking_ident =
3558 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3559
3560 match builders_or_callback {
3561 BuildersOrCallback::Builders(graph_builders) => {
3562 let builder = graph_builders.get_dfir_mut(&out_location);
3563 builder.add_dfir(
3564 parse_quote! {
3565 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3566 },
3567 None,
3568 Some(&next_stmt_id.to_string()),
3569 );
3570 }
3571 BuildersOrCallback::Callback(_, node_callback) => {
3572 node_callback(node, next_stmt_id);
3573 }
3574 }
3575
3576 *next_stmt_id += 1;
3577
3578 ident_stack.push(flat_map_stream_blocking_ident);
3579 }
3580
3581 HydroNode::Filter { f, .. } => {
3582 let input_ident = ident_stack.pop().unwrap();
3583
3584 let filter_ident =
3585 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3586
3587 match builders_or_callback {
3588 BuildersOrCallback::Builders(graph_builders) => {
3589 let builder = graph_builders.get_dfir_mut(&out_location);
3590 builder.add_dfir(
3591 parse_quote! {
3592 #filter_ident = #input_ident -> filter(#f);
3593 },
3594 None,
3595 Some(&next_stmt_id.to_string()),
3596 );
3597 }
3598 BuildersOrCallback::Callback(_, node_callback) => {
3599 node_callback(node, next_stmt_id);
3600 }
3601 }
3602
3603 *next_stmt_id += 1;
3604
3605 ident_stack.push(filter_ident);
3606 }
3607
3608 HydroNode::FilterMap { f, .. } => {
3609 let input_ident = ident_stack.pop().unwrap();
3610
3611 let filter_map_ident =
3612 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3613
3614 match builders_or_callback {
3615 BuildersOrCallback::Builders(graph_builders) => {
3616 let builder = graph_builders.get_dfir_mut(&out_location);
3617 builder.add_dfir(
3618 parse_quote! {
3619 #filter_map_ident = #input_ident -> filter_map(#f);
3620 },
3621 None,
3622 Some(&next_stmt_id.to_string()),
3623 );
3624 }
3625 BuildersOrCallback::Callback(_, node_callback) => {
3626 node_callback(node, next_stmt_id);
3627 }
3628 }
3629
3630 *next_stmt_id += 1;
3631
3632 ident_stack.push(filter_map_ident);
3633 }
3634
3635 HydroNode::Sort { .. } => {
3636 let input_ident = ident_stack.pop().unwrap();
3637
3638 let sort_ident =
3639 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3640
3641 match builders_or_callback {
3642 BuildersOrCallback::Builders(graph_builders) => {
3643 let builder = graph_builders.get_dfir_mut(&out_location);
3644 builder.add_dfir(
3645 parse_quote! {
3646 #sort_ident = #input_ident -> sort();
3647 },
3648 None,
3649 Some(&next_stmt_id.to_string()),
3650 );
3651 }
3652 BuildersOrCallback::Callback(_, node_callback) => {
3653 node_callback(node, next_stmt_id);
3654 }
3655 }
3656
3657 *next_stmt_id += 1;
3658
3659 ident_stack.push(sort_ident);
3660 }
3661
3662 HydroNode::DeferTick { .. } => {
3663 let input_ident = ident_stack.pop().unwrap();
3664
3665 let defer_tick_ident =
3666 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3667
3668 match builders_or_callback {
3669 BuildersOrCallback::Builders(graph_builders) => {
3670 let builder = graph_builders.get_dfir_mut(&out_location);
3671 builder.add_dfir(
3672 parse_quote! {
3673 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3674 },
3675 None,
3676 Some(&next_stmt_id.to_string()),
3677 );
3678 }
3679 BuildersOrCallback::Callback(_, node_callback) => {
3680 node_callback(node, next_stmt_id);
3681 }
3682 }
3683
3684 *next_stmt_id += 1;
3685
3686 ident_stack.push(defer_tick_ident);
3687 }
3688
3689 HydroNode::Enumerate { input, .. } => {
3690 let input_ident = ident_stack.pop().unwrap();
3691
3692 let enumerate_ident =
3693 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3694
3695 match builders_or_callback {
3696 BuildersOrCallback::Builders(graph_builders) => {
3697 let builder = graph_builders.get_dfir_mut(&out_location);
3698 let lifetime = if input.metadata().location_id.is_top_level() {
3699 quote!('static)
3700 } else {
3701 quote!('tick)
3702 };
3703 builder.add_dfir(
3704 parse_quote! {
3705 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3706 },
3707 None,
3708 Some(&next_stmt_id.to_string()),
3709 );
3710 }
3711 BuildersOrCallback::Callback(_, node_callback) => {
3712 node_callback(node, next_stmt_id);
3713 }
3714 }
3715
3716 *next_stmt_id += 1;
3717
3718 ident_stack.push(enumerate_ident);
3719 }
3720
3721 HydroNode::Inspect { f, .. } => {
3722 let input_ident = ident_stack.pop().unwrap();
3723
3724 let inspect_ident =
3725 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3726
3727 match builders_or_callback {
3728 BuildersOrCallback::Builders(graph_builders) => {
3729 let builder = graph_builders.get_dfir_mut(&out_location);
3730 builder.add_dfir(
3731 parse_quote! {
3732 #inspect_ident = #input_ident -> inspect(#f);
3733 },
3734 None,
3735 Some(&next_stmt_id.to_string()),
3736 );
3737 }
3738 BuildersOrCallback::Callback(_, node_callback) => {
3739 node_callback(node, next_stmt_id);
3740 }
3741 }
3742
3743 *next_stmt_id += 1;
3744
3745 ident_stack.push(inspect_ident);
3746 }
3747
3748 HydroNode::Unique { input, .. } => {
3749 let input_ident = ident_stack.pop().unwrap();
3750
3751 let unique_ident =
3752 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3753
3754 match builders_or_callback {
3755 BuildersOrCallback::Builders(graph_builders) => {
3756 let builder = graph_builders.get_dfir_mut(&out_location);
3757 let lifetime = if input.metadata().location_id.is_top_level() {
3758 quote!('static)
3759 } else {
3760 quote!('tick)
3761 };
3762
3763 builder.add_dfir(
3764 parse_quote! {
3765 #unique_ident = #input_ident -> unique::<#lifetime>();
3766 },
3767 None,
3768 Some(&next_stmt_id.to_string()),
3769 );
3770 }
3771 BuildersOrCallback::Callback(_, node_callback) => {
3772 node_callback(node, next_stmt_id);
3773 }
3774 }
3775
3776 *next_stmt_id += 1;
3777
3778 ident_stack.push(unique_ident);
3779 }
3780
3781 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3782 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3783 if input.metadata().location_id.is_top_level()
3784 && input.metadata().collection_kind.is_bounded()
3785 {
3786 parse_quote!(fold_no_replay)
3787 } else {
3788 parse_quote!(fold)
3789 }
3790 } else if matches!(node, HydroNode::Scan { .. }) {
3791 parse_quote!(scan)
3792 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3793 parse_quote!(scan_async_blocking)
3794 } else if let HydroNode::FoldKeyed { input, .. } = node {
3795 if input.metadata().location_id.is_top_level()
3796 && input.metadata().collection_kind.is_bounded()
3797 {
3798 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3799 } else {
3800 parse_quote!(fold_keyed)
3801 }
3802 } else {
3803 unreachable!()
3804 };
3805
3806 let (HydroNode::Fold { input, .. }
3807 | HydroNode::FoldKeyed { input, .. }
3808 | HydroNode::Scan { input, .. }
3809 | HydroNode::ScanAsyncBlocking { input, .. }) = node
3810 else {
3811 unreachable!()
3812 };
3813
3814 let lifetime = if input.metadata().location_id.is_top_level() {
3815 quote!('static)
3816 } else {
3817 quote!('tick)
3818 };
3819
3820 let input_ident = ident_stack.pop().unwrap();
3821
3822 let (HydroNode::Fold { init, acc, .. }
3823 | HydroNode::FoldKeyed { init, acc, .. }
3824 | HydroNode::Scan { init, acc, .. }
3825 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3826 else {
3827 unreachable!()
3828 };
3829
3830 let fold_ident =
3831 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3832
3833 match builders_or_callback {
3834 BuildersOrCallback::Builders(graph_builders) => {
3835 if matches!(node, HydroNode::Fold { .. })
3836 && node.metadata().location_id.is_top_level()
3837 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3838 && graph_builders.singleton_intermediates()
3839 && !node.metadata().collection_kind.is_bounded()
3840 {
3841 let builder = graph_builders.get_dfir_mut(&out_location);
3842
3843 let acc: syn::Expr = parse_quote!({
3844 let mut __inner = #acc;
3845 move |__state, __value| {
3846 __inner(__state, __value);
3847 Some(__state.clone())
3848 }
3849 });
3850
3851 builder.add_dfir(
3852 parse_quote! {
3853 source_iter([(#init)()]) -> [0]#fold_ident;
3854 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3855 #fold_ident = chain();
3856 },
3857 None,
3858 Some(&next_stmt_id.to_string()),
3859 );
3860 } else if matches!(node, HydroNode::FoldKeyed { .. })
3861 && node.metadata().location_id.is_top_level()
3862 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3863 && graph_builders.singleton_intermediates()
3864 && !node.metadata().collection_kind.is_bounded()
3865 {
3866 let builder = graph_builders.get_dfir_mut(&out_location);
3867
3868 let acc: syn::Expr = parse_quote!({
3869 let mut __init = #init;
3870 let mut __inner = #acc;
3871 move |__state, __kv: (_, _)| {
3872 let __state = __state
3874 .entry(::std::clone::Clone::clone(&__kv.0))
3875 .or_insert_with(|| (__init)());
3876 __inner(__state, __kv.1);
3877 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3878 }
3879 });
3880
3881 builder.add_dfir(
3882 parse_quote! {
3883 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3884 },
3885 None,
3886 Some(&next_stmt_id.to_string()),
3887 );
3888 } else {
3889 let builder = graph_builders.get_dfir_mut(&out_location);
3890 builder.add_dfir(
3891 parse_quote! {
3892 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3893 },
3894 None,
3895 Some(&next_stmt_id.to_string()),
3896 );
3897 }
3898 }
3899 BuildersOrCallback::Callback(_, node_callback) => {
3900 node_callback(node, next_stmt_id);
3901 }
3902 }
3903
3904 *next_stmt_id += 1;
3905
3906 ident_stack.push(fold_ident);
3907 }
3908
3909 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3910 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3911 if input.metadata().location_id.is_top_level()
3912 && input.metadata().collection_kind.is_bounded()
3913 {
3914 parse_quote!(reduce_no_replay)
3915 } else {
3916 parse_quote!(reduce)
3917 }
3918 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3919 if input.metadata().location_id.is_top_level()
3920 && input.metadata().collection_kind.is_bounded()
3921 {
3922 todo!(
3923 "Calling keyed reduce on a top-level bounded collection is not supported"
3924 )
3925 } else {
3926 parse_quote!(reduce_keyed)
3927 }
3928 } else {
3929 unreachable!()
3930 };
3931
3932 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3933 else {
3934 unreachable!()
3935 };
3936
3937 let lifetime = if input.metadata().location_id.is_top_level() {
3938 quote!('static)
3939 } else {
3940 quote!('tick)
3941 };
3942
3943 let input_ident = ident_stack.pop().unwrap();
3944
3945 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3946 else {
3947 unreachable!()
3948 };
3949
3950 let reduce_ident =
3951 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3952
3953 match builders_or_callback {
3954 BuildersOrCallback::Builders(graph_builders) => {
3955 if matches!(node, HydroNode::Reduce { .. })
3956 && node.metadata().location_id.is_top_level()
3957 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3958 && graph_builders.singleton_intermediates()
3959 && !node.metadata().collection_kind.is_bounded()
3960 {
3961 todo!(
3962 "Reduce with optional intermediates is not yet supported in simulator"
3963 );
3964 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3965 && node.metadata().location_id.is_top_level()
3966 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3967 && graph_builders.singleton_intermediates()
3968 && !node.metadata().collection_kind.is_bounded()
3969 {
3970 todo!(
3971 "Reduce keyed with optional intermediates is not yet supported in simulator"
3972 );
3973 } else {
3974 let builder = graph_builders.get_dfir_mut(&out_location);
3975 builder.add_dfir(
3976 parse_quote! {
3977 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3978 },
3979 None,
3980 Some(&next_stmt_id.to_string()),
3981 );
3982 }
3983 }
3984 BuildersOrCallback::Callback(_, node_callback) => {
3985 node_callback(node, next_stmt_id);
3986 }
3987 }
3988
3989 *next_stmt_id += 1;
3990
3991 ident_stack.push(reduce_ident);
3992 }
3993
3994 HydroNode::ReduceKeyedWatermark {
3995 f,
3996 input,
3997 metadata,
3998 ..
3999 } => {
4000 let lifetime = if input.metadata().location_id.is_top_level() {
4001 quote!('static)
4002 } else {
4003 quote!('tick)
4004 };
4005
4006 let watermark_ident = ident_stack.pop().unwrap();
4008 let input_ident = ident_stack.pop().unwrap();
4009
4010 let chain_ident = syn::Ident::new(
4011 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4012 Span::call_site(),
4013 );
4014
4015 let fold_ident =
4016 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4017
4018 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4019 && input.metadata().collection_kind.is_bounded()
4020 {
4021 parse_quote!(fold_no_replay)
4022 } else {
4023 parse_quote!(fold)
4024 };
4025
4026 match builders_or_callback {
4027 BuildersOrCallback::Builders(graph_builders) => {
4028 if metadata.location_id.is_top_level()
4029 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4030 && graph_builders.singleton_intermediates()
4031 && !metadata.collection_kind.is_bounded()
4032 {
4033 todo!(
4034 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4035 )
4036 } else {
4037 let builder = graph_builders.get_dfir_mut(&out_location);
4038 builder.add_dfir(
4039 parse_quote! {
4040 #chain_ident = chain();
4041 #input_ident
4042 -> map(|x| (Some(x), None))
4043 -> [0]#chain_ident;
4044 #watermark_ident
4045 -> map(|watermark| (None, Some(watermark)))
4046 -> [1]#chain_ident;
4047
4048 #fold_ident = #chain_ident
4049 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4050 let __reduce_keyed_fn = #f;
4051 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4052 if let Some((k, v)) = opt_payload {
4053 if let Some(curr_watermark) = *opt_curr_watermark {
4054 if k < curr_watermark {
4055 return;
4056 }
4057 }
4058 match map.entry(k) {
4059 ::std::collections::hash_map::Entry::Vacant(e) => {
4060 e.insert(v);
4061 }
4062 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4063 __reduce_keyed_fn(e.get_mut(), v);
4064 }
4065 }
4066 } else {
4067 let watermark = opt_watermark.unwrap();
4068 if let Some(curr_watermark) = *opt_curr_watermark {
4069 if watermark <= curr_watermark {
4070 return;
4071 }
4072 }
4073 map.retain(|k, _| *k >= watermark);
4074 *opt_curr_watermark = Some(watermark);
4075 }
4076 }
4077 })
4078 -> flat_map(|(map, _curr_watermark)| map);
4079 },
4080 None,
4081 Some(&next_stmt_id.to_string()),
4082 );
4083 }
4084 }
4085 BuildersOrCallback::Callback(_, node_callback) => {
4086 node_callback(node, next_stmt_id);
4087 }
4088 }
4089
4090 *next_stmt_id += 1;
4091
4092 ident_stack.push(fold_ident);
4093 }
4094
4095 HydroNode::Network {
4096 networking_info,
4097 serialize_fn: serialize_pipeline,
4098 instantiate_fn,
4099 deserialize_fn: deserialize_pipeline,
4100 input,
4101 ..
4102 } => {
4103 let input_ident = ident_stack.pop().unwrap();
4104
4105 let receiver_stream_ident =
4106 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4107
4108 match builders_or_callback {
4109 BuildersOrCallback::Builders(graph_builders) => {
4110 let (sink_expr, source_expr) = match instantiate_fn {
4111 DebugInstantiate::Building => (
4112 syn::parse_quote!(DUMMY_SINK),
4113 syn::parse_quote!(DUMMY_SOURCE),
4114 ),
4115
4116 DebugInstantiate::Finalized(finalized) => {
4117 (finalized.sink.clone(), finalized.source.clone())
4118 }
4119 };
4120
4121 graph_builders.create_network(
4122 &input.metadata().location_id,
4123 &out_location,
4124 input_ident,
4125 &receiver_stream_ident,
4126 serialize_pipeline.as_ref(),
4127 sink_expr,
4128 source_expr,
4129 deserialize_pipeline.as_ref(),
4130 *next_stmt_id,
4131 networking_info,
4132 );
4133 }
4134 BuildersOrCallback::Callback(_, node_callback) => {
4135 node_callback(node, next_stmt_id);
4136 }
4137 }
4138
4139 *next_stmt_id += 1;
4140
4141 ident_stack.push(receiver_stream_ident);
4142 }
4143
4144 HydroNode::ExternalInput {
4145 instantiate_fn,
4146 deserialize_fn: deserialize_pipeline,
4147 ..
4148 } => {
4149 let receiver_stream_ident =
4150 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4151
4152 match builders_or_callback {
4153 BuildersOrCallback::Builders(graph_builders) => {
4154 let (_, source_expr) = match instantiate_fn {
4155 DebugInstantiate::Building => (
4156 syn::parse_quote!(DUMMY_SINK),
4157 syn::parse_quote!(DUMMY_SOURCE),
4158 ),
4159
4160 DebugInstantiate::Finalized(finalized) => {
4161 (finalized.sink.clone(), finalized.source.clone())
4162 }
4163 };
4164
4165 graph_builders.create_external_source(
4166 &out_location,
4167 source_expr,
4168 &receiver_stream_ident,
4169 deserialize_pipeline.as_ref(),
4170 *next_stmt_id,
4171 );
4172 }
4173 BuildersOrCallback::Callback(_, node_callback) => {
4174 node_callback(node, next_stmt_id);
4175 }
4176 }
4177
4178 *next_stmt_id += 1;
4179
4180 ident_stack.push(receiver_stream_ident);
4181 }
4182
4183 HydroNode::Counter {
4184 tag,
4185 duration,
4186 prefix,
4187 ..
4188 } => {
4189 let input_ident = ident_stack.pop().unwrap();
4190
4191 let counter_ident =
4192 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4193
4194 match builders_or_callback {
4195 BuildersOrCallback::Builders(graph_builders) => {
4196 let arg = format!("{}({})", prefix, tag);
4197 let builder = graph_builders.get_dfir_mut(&out_location);
4198 builder.add_dfir(
4199 parse_quote! {
4200 #counter_ident = #input_ident -> _counter(#arg, #duration);
4201 },
4202 None,
4203 Some(&next_stmt_id.to_string()),
4204 );
4205 }
4206 BuildersOrCallback::Callback(_, node_callback) => {
4207 node_callback(node, next_stmt_id);
4208 }
4209 }
4210
4211 *next_stmt_id += 1;
4212
4213 ident_stack.push(counter_ident);
4214 }
4215 }
4216 },
4217 seen_tees,
4218 false,
4219 );
4220
4221 ident_stack
4222 .pop()
4223 .expect("ident_stack should have exactly one element after traversal")
4224 }
4225
4226 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4227 match self {
4228 HydroNode::Placeholder => {
4229 panic!()
4230 }
4231 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4232 HydroNode::Source { source, .. } => match source {
4233 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4234 HydroSource::ExternalNetwork()
4235 | HydroSource::Spin()
4236 | HydroSource::ClusterMembers(_, _)
4237 | HydroSource::Embedded(_)
4238 | HydroSource::EmbeddedSingleton(_) => {} },
4240 HydroNode::SingletonSource { value, .. } => {
4241 transform(value);
4242 }
4243 HydroNode::CycleSource { .. }
4244 | HydroNode::Tee { .. }
4245 | HydroNode::YieldConcat { .. }
4246 | HydroNode::BeginAtomic { .. }
4247 | HydroNode::EndAtomic { .. }
4248 | HydroNode::Batch { .. }
4249 | HydroNode::Chain { .. }
4250 | HydroNode::ChainFirst { .. }
4251 | HydroNode::CrossProduct { .. }
4252 | HydroNode::CrossSingleton { .. }
4253 | HydroNode::ResolveFutures { .. }
4254 | HydroNode::ResolveFuturesBlocking { .. }
4255 | HydroNode::ResolveFuturesOrdered { .. }
4256 | HydroNode::Join { .. }
4257 | HydroNode::JoinHalf { .. }
4258 | HydroNode::Difference { .. }
4259 | HydroNode::AntiJoin { .. }
4260 | HydroNode::DeferTick { .. }
4261 | HydroNode::Enumerate { .. }
4262 | HydroNode::Unique { .. }
4263 | HydroNode::Sort { .. } => {}
4264 HydroNode::Map { f, .. }
4265 | HydroNode::FlatMap { f, .. }
4266 | HydroNode::FlatMapStreamBlocking { f, .. }
4267 | HydroNode::Filter { f, .. }
4268 | HydroNode::FilterMap { f, .. }
4269 | HydroNode::Inspect { f, .. }
4270 | HydroNode::Partition { f, .. }
4271 | HydroNode::Reduce { f, .. }
4272 | HydroNode::ReduceKeyed { f, .. }
4273 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4274 transform(f);
4275 }
4276 HydroNode::Fold { init, acc, .. }
4277 | HydroNode::Scan { init, acc, .. }
4278 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4279 | HydroNode::FoldKeyed { init, acc, .. } => {
4280 transform(init);
4281 transform(acc);
4282 }
4283 HydroNode::Network {
4284 serialize_fn,
4285 deserialize_fn,
4286 ..
4287 } => {
4288 if let Some(serialize_fn) = serialize_fn {
4289 transform(serialize_fn);
4290 }
4291 if let Some(deserialize_fn) = deserialize_fn {
4292 transform(deserialize_fn);
4293 }
4294 }
4295 HydroNode::ExternalInput { deserialize_fn, .. } => {
4296 if let Some(deserialize_fn) = deserialize_fn {
4297 transform(deserialize_fn);
4298 }
4299 }
4300 HydroNode::Counter { duration, .. } => {
4301 transform(duration);
4302 }
4303 }
4304 }
4305
4306 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4307 &self.metadata().op
4308 }
4309
4310 pub fn metadata(&self) -> &HydroIrMetadata {
4311 match self {
4312 HydroNode::Placeholder => {
4313 panic!()
4314 }
4315 HydroNode::Cast { metadata, .. } => metadata,
4316 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4317 HydroNode::Source { metadata, .. } => metadata,
4318 HydroNode::SingletonSource { metadata, .. } => metadata,
4319 HydroNode::CycleSource { metadata, .. } => metadata,
4320 HydroNode::Tee { metadata, .. } => metadata,
4321 HydroNode::Partition { metadata, .. } => metadata,
4322 HydroNode::YieldConcat { metadata, .. } => metadata,
4323 HydroNode::BeginAtomic { metadata, .. } => metadata,
4324 HydroNode::EndAtomic { metadata, .. } => metadata,
4325 HydroNode::Batch { metadata, .. } => metadata,
4326 HydroNode::Chain { metadata, .. } => metadata,
4327 HydroNode::ChainFirst { metadata, .. } => metadata,
4328 HydroNode::CrossProduct { metadata, .. } => metadata,
4329 HydroNode::CrossSingleton { metadata, .. } => metadata,
4330 HydroNode::Join { metadata, .. } => metadata,
4331 HydroNode::JoinHalf { metadata, .. } => metadata,
4332 HydroNode::Difference { metadata, .. } => metadata,
4333 HydroNode::AntiJoin { metadata, .. } => metadata,
4334 HydroNode::ResolveFutures { metadata, .. } => metadata,
4335 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4336 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4337 HydroNode::Map { metadata, .. } => metadata,
4338 HydroNode::FlatMap { metadata, .. } => metadata,
4339 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4340 HydroNode::Filter { metadata, .. } => metadata,
4341 HydroNode::FilterMap { metadata, .. } => metadata,
4342 HydroNode::DeferTick { metadata, .. } => metadata,
4343 HydroNode::Enumerate { metadata, .. } => metadata,
4344 HydroNode::Inspect { metadata, .. } => metadata,
4345 HydroNode::Unique { metadata, .. } => metadata,
4346 HydroNode::Sort { metadata, .. } => metadata,
4347 HydroNode::Scan { metadata, .. } => metadata,
4348 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4349 HydroNode::Fold { metadata, .. } => metadata,
4350 HydroNode::FoldKeyed { metadata, .. } => metadata,
4351 HydroNode::Reduce { metadata, .. } => metadata,
4352 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4353 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4354 HydroNode::ExternalInput { metadata, .. } => metadata,
4355 HydroNode::Network { metadata, .. } => metadata,
4356 HydroNode::Counter { metadata, .. } => metadata,
4357 }
4358 }
4359
4360 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4361 &mut self.metadata_mut().op
4362 }
4363
4364 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4365 match self {
4366 HydroNode::Placeholder => {
4367 panic!()
4368 }
4369 HydroNode::Cast { metadata, .. } => metadata,
4370 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4371 HydroNode::Source { metadata, .. } => metadata,
4372 HydroNode::SingletonSource { metadata, .. } => metadata,
4373 HydroNode::CycleSource { metadata, .. } => metadata,
4374 HydroNode::Tee { metadata, .. } => metadata,
4375 HydroNode::Partition { metadata, .. } => metadata,
4376 HydroNode::YieldConcat { metadata, .. } => metadata,
4377 HydroNode::BeginAtomic { metadata, .. } => metadata,
4378 HydroNode::EndAtomic { metadata, .. } => metadata,
4379 HydroNode::Batch { metadata, .. } => metadata,
4380 HydroNode::Chain { metadata, .. } => metadata,
4381 HydroNode::ChainFirst { metadata, .. } => metadata,
4382 HydroNode::CrossProduct { metadata, .. } => metadata,
4383 HydroNode::CrossSingleton { metadata, .. } => metadata,
4384 HydroNode::Join { metadata, .. } => metadata,
4385 HydroNode::JoinHalf { metadata, .. } => metadata,
4386 HydroNode::Difference { metadata, .. } => metadata,
4387 HydroNode::AntiJoin { metadata, .. } => metadata,
4388 HydroNode::ResolveFutures { metadata, .. } => metadata,
4389 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4390 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4391 HydroNode::Map { metadata, .. } => metadata,
4392 HydroNode::FlatMap { metadata, .. } => metadata,
4393 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4394 HydroNode::Filter { metadata, .. } => metadata,
4395 HydroNode::FilterMap { metadata, .. } => metadata,
4396 HydroNode::DeferTick { metadata, .. } => metadata,
4397 HydroNode::Enumerate { metadata, .. } => metadata,
4398 HydroNode::Inspect { metadata, .. } => metadata,
4399 HydroNode::Unique { metadata, .. } => metadata,
4400 HydroNode::Sort { metadata, .. } => metadata,
4401 HydroNode::Scan { metadata, .. } => metadata,
4402 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4403 HydroNode::Fold { metadata, .. } => metadata,
4404 HydroNode::FoldKeyed { metadata, .. } => metadata,
4405 HydroNode::Reduce { metadata, .. } => metadata,
4406 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4407 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4408 HydroNode::ExternalInput { metadata, .. } => metadata,
4409 HydroNode::Network { metadata, .. } => metadata,
4410 HydroNode::Counter { metadata, .. } => metadata,
4411 }
4412 }
4413
4414 pub fn input(&self) -> Vec<&HydroNode> {
4415 match self {
4416 HydroNode::Placeholder => {
4417 panic!()
4418 }
4419 HydroNode::Source { .. }
4420 | HydroNode::SingletonSource { .. }
4421 | HydroNode::ExternalInput { .. }
4422 | HydroNode::CycleSource { .. }
4423 | HydroNode::Tee { .. }
4424 | HydroNode::Partition { .. } => {
4425 vec![]
4427 }
4428 HydroNode::Cast { inner, .. }
4429 | HydroNode::ObserveNonDet { inner, .. }
4430 | HydroNode::YieldConcat { inner, .. }
4431 | HydroNode::BeginAtomic { inner, .. }
4432 | HydroNode::EndAtomic { inner, .. }
4433 | HydroNode::Batch { inner, .. } => {
4434 vec![inner]
4435 }
4436 HydroNode::Chain { first, second, .. } => {
4437 vec![first, second]
4438 }
4439 HydroNode::ChainFirst { first, second, .. } => {
4440 vec![first, second]
4441 }
4442 HydroNode::CrossProduct { left, right, .. }
4443 | HydroNode::CrossSingleton { left, right, .. }
4444 | HydroNode::Join { left, right, .. }
4445 | HydroNode::JoinHalf { left, right, .. } => {
4446 vec![left, right]
4447 }
4448 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4449 vec![pos, neg]
4450 }
4451 HydroNode::Map { input, .. }
4452 | HydroNode::FlatMap { input, .. }
4453 | HydroNode::FlatMapStreamBlocking { input, .. }
4454 | HydroNode::Filter { input, .. }
4455 | HydroNode::FilterMap { input, .. }
4456 | HydroNode::Sort { input, .. }
4457 | HydroNode::DeferTick { input, .. }
4458 | HydroNode::Enumerate { input, .. }
4459 | HydroNode::Inspect { input, .. }
4460 | HydroNode::Unique { input, .. }
4461 | HydroNode::Network { input, .. }
4462 | HydroNode::Counter { input, .. }
4463 | HydroNode::ResolveFutures { input, .. }
4464 | HydroNode::ResolveFuturesBlocking { input, .. }
4465 | HydroNode::ResolveFuturesOrdered { input, .. }
4466 | HydroNode::Fold { input, .. }
4467 | HydroNode::FoldKeyed { input, .. }
4468 | HydroNode::Reduce { input, .. }
4469 | HydroNode::ReduceKeyed { input, .. }
4470 | HydroNode::Scan { input, .. }
4471 | HydroNode::ScanAsyncBlocking { input, .. } => {
4472 vec![input]
4473 }
4474 HydroNode::ReduceKeyedWatermark {
4475 input, watermark, ..
4476 } => {
4477 vec![input, watermark]
4478 }
4479 }
4480 }
4481
4482 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4483 self.input()
4484 .iter()
4485 .map(|input_node| input_node.metadata())
4486 .collect()
4487 }
4488
4489 pub fn is_shared_with_others(&self) -> bool {
4493 match self {
4494 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4495 Rc::strong_count(&inner.0) > 1
4496 }
4497 _ => false,
4498 }
4499 }
4500
4501 pub fn print_root(&self) -> String {
4502 match self {
4503 HydroNode::Placeholder => {
4504 panic!()
4505 }
4506 HydroNode::Cast { .. } => "Cast()".to_owned(),
4507 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4508 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4509 HydroNode::SingletonSource {
4510 value,
4511 first_tick_only,
4512 ..
4513 } => format!(
4514 "SingletonSource({:?}, first_tick_only={})",
4515 value, first_tick_only
4516 ),
4517 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4518 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4519 HydroNode::Partition { f, is_true, .. } => {
4520 format!("Partition({:?}, is_true={})", f, is_true)
4521 }
4522 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4523 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4524 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4525 HydroNode::Batch { .. } => "Batch()".to_owned(),
4526 HydroNode::Chain { first, second, .. } => {
4527 format!("Chain({}, {})", first.print_root(), second.print_root())
4528 }
4529 HydroNode::ChainFirst { first, second, .. } => {
4530 format!(
4531 "ChainFirst({}, {})",
4532 first.print_root(),
4533 second.print_root()
4534 )
4535 }
4536 HydroNode::CrossProduct { left, right, .. } => {
4537 format!(
4538 "CrossProduct({}, {})",
4539 left.print_root(),
4540 right.print_root()
4541 )
4542 }
4543 HydroNode::CrossSingleton { left, right, .. } => {
4544 format!(
4545 "CrossSingleton({}, {})",
4546 left.print_root(),
4547 right.print_root()
4548 )
4549 }
4550 HydroNode::Join { left, right, .. } => {
4551 format!("Join({}, {})", left.print_root(), right.print_root())
4552 }
4553 HydroNode::JoinHalf { left, right, .. } => {
4554 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4555 }
4556 HydroNode::Difference { pos, neg, .. } => {
4557 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4558 }
4559 HydroNode::AntiJoin { pos, neg, .. } => {
4560 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4561 }
4562 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4563 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4564 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4565 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4566 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4567 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4568 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4569 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4570 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4571 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4572 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4573 HydroNode::Unique { .. } => "Unique()".to_owned(),
4574 HydroNode::Sort { .. } => "Sort()".to_owned(),
4575 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4576 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4577 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4578 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4579 }
4580 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4581 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4582 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4583 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4584 HydroNode::Network { .. } => "Network()".to_owned(),
4585 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4586 HydroNode::Counter { tag, duration, .. } => {
4587 format!("Counter({:?}, {:?})", tag, duration)
4588 }
4589 }
4590 }
4591}
4592
4593#[cfg(feature = "build")]
4594fn instantiate_network<'a, D>(
4595 env: &mut D::InstantiateEnv,
4596 from_location: &LocationId,
4597 to_location: &LocationId,
4598 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4599 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4600 name: Option<&str>,
4601 networking_info: &crate::networking::NetworkingInfo,
4602) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4603where
4604 D: Deploy<'a>,
4605{
4606 let ((sink, source), connect_fn) = match (from_location, to_location) {
4607 (&LocationId::Process(from), &LocationId::Process(to)) => {
4608 let from_node = processes
4609 .get(from)
4610 .unwrap_or_else(|| {
4611 panic!("A process used in the graph was not instantiated: {}", from)
4612 })
4613 .clone();
4614 let to_node = processes
4615 .get(to)
4616 .unwrap_or_else(|| {
4617 panic!("A process used in the graph was not instantiated: {}", to)
4618 })
4619 .clone();
4620
4621 let sink_port = from_node.next_port();
4622 let source_port = to_node.next_port();
4623
4624 (
4625 D::o2o_sink_source(
4626 env,
4627 &from_node,
4628 &sink_port,
4629 &to_node,
4630 &source_port,
4631 name,
4632 networking_info,
4633 ),
4634 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4635 )
4636 }
4637 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4638 let from_node = processes
4639 .get(from)
4640 .unwrap_or_else(|| {
4641 panic!("A process used in the graph was not instantiated: {}", from)
4642 })
4643 .clone();
4644 let to_node = clusters
4645 .get(to)
4646 .unwrap_or_else(|| {
4647 panic!("A cluster used in the graph was not instantiated: {}", to)
4648 })
4649 .clone();
4650
4651 let sink_port = from_node.next_port();
4652 let source_port = to_node.next_port();
4653
4654 (
4655 D::o2m_sink_source(
4656 env,
4657 &from_node,
4658 &sink_port,
4659 &to_node,
4660 &source_port,
4661 name,
4662 networking_info,
4663 ),
4664 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4665 )
4666 }
4667 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4668 let from_node = clusters
4669 .get(from)
4670 .unwrap_or_else(|| {
4671 panic!("A cluster used in the graph was not instantiated: {}", from)
4672 })
4673 .clone();
4674 let to_node = processes
4675 .get(to)
4676 .unwrap_or_else(|| {
4677 panic!("A process used in the graph was not instantiated: {}", to)
4678 })
4679 .clone();
4680
4681 let sink_port = from_node.next_port();
4682 let source_port = to_node.next_port();
4683
4684 (
4685 D::m2o_sink_source(
4686 env,
4687 &from_node,
4688 &sink_port,
4689 &to_node,
4690 &source_port,
4691 name,
4692 networking_info,
4693 ),
4694 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4695 )
4696 }
4697 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4698 let from_node = clusters
4699 .get(from)
4700 .unwrap_or_else(|| {
4701 panic!("A cluster used in the graph was not instantiated: {}", from)
4702 })
4703 .clone();
4704 let to_node = clusters
4705 .get(to)
4706 .unwrap_or_else(|| {
4707 panic!("A cluster used in the graph was not instantiated: {}", to)
4708 })
4709 .clone();
4710
4711 let sink_port = from_node.next_port();
4712 let source_port = to_node.next_port();
4713
4714 (
4715 D::m2m_sink_source(
4716 env,
4717 &from_node,
4718 &sink_port,
4719 &to_node,
4720 &source_port,
4721 name,
4722 networking_info,
4723 ),
4724 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4725 )
4726 }
4727 (LocationId::Tick(_, _), _) => panic!(),
4728 (_, LocationId::Tick(_, _)) => panic!(),
4729 (LocationId::Atomic(_), _) => panic!(),
4730 (_, LocationId::Atomic(_)) => panic!(),
4731 };
4732 (sink, source, connect_fn)
4733}
4734
4735#[cfg(test)]
4736mod serde_test;
4737
4738#[cfg(test)]
4739mod test {
4740 use std::mem::size_of;
4741
4742 use stageleft::{QuotedWithContext, q};
4743
4744 use super::*;
4745
4746 #[test]
4747 #[cfg_attr(
4748 not(feature = "build"),
4749 ignore = "expects inclusion of feature-gated fields"
4750 )]
4751 fn hydro_node_size() {
4752 assert_eq!(size_of::<HydroNode>(), 248);
4753 }
4754
4755 #[test]
4756 #[cfg_attr(
4757 not(feature = "build"),
4758 ignore = "expects inclusion of feature-gated fields"
4759 )]
4760 fn hydro_root_size() {
4761 assert_eq!(size_of::<HydroRoot>(), 136);
4762 }
4763
4764 #[test]
4765 fn test_simplify_q_macro_basic() {
4766 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4768 let result = simplify_q_macro(simple_expr.clone());
4769 assert_eq!(result, simple_expr);
4770 }
4771
4772 #[test]
4773 fn test_simplify_q_macro_actual_stageleft_call() {
4774 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4776 let result = simplify_q_macro(stageleft_call);
4777 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4780 }
4781
4782 #[test]
4783 fn test_closure_no_pipe_at_start() {
4784 let stageleft_call = q!({
4786 let foo = 123;
4787 move |b: usize| b + foo
4788 })
4789 .splice_fn1_ctx(&());
4790 let result = simplify_q_macro(stageleft_call);
4791 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4792 }
4793}