Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19// Re-export LinuxCompileType so users can configure compile type without depending on hydro_deploy directly.
20pub use hydro_deploy::LinuxCompileType;
21use hydro_deploy::RustCrate;
22use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
23use nanoid::nanoid;
24use proc_macro2::Span;
25use sinktools::lazy::LazySink;
26use stageleft::QuotedWithContext;
27use syn::parse_quote;
28use tar::{Builder, Header};
29use tokio::net::TcpStream;
30use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
31use tracing::{Instrument, instrument, trace, warn};
32
33use super::deploy_runtime_containerized::*;
34use crate::compile::builder::ExternalPortId;
35use crate::compile::deploy::DeployResult;
36use crate::compile::deploy_provider::{
37    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
38};
39use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
40use crate::location::dynamic::LocationId;
41use crate::location::member_id::TaglessMemberId;
42use crate::location::{LocationKey, MembershipEvent, NetworkHint};
43
44/// represents a docker network
45#[derive(Clone, Debug)]
46pub struct DockerNetwork {
47    name: String,
48}
49
50impl DockerNetwork {
51    /// creates a new docker network (will actually be created when deployment.start() is called).
52    pub fn new(name: String) -> Self {
53        Self {
54            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
55        }
56    }
57}
58
59/// Represents a process running in a docker container
60#[derive(Clone)]
61pub struct DockerDeployProcess {
62    key: LocationKey,
63    name: String,
64    next_port: Rc<RefCell<u16>>,
65    rust_crate: Rc<RefCell<Option<RustCrate>>>,
66
67    exposed_ports: Rc<RefCell<Vec<u16>>>,
68
69    docker_container_name: Rc<RefCell<Option<String>>>,
70
71    compilation_options: Option<String>,
72
73    config: Vec<String>,
74
75    network: DockerNetwork,
76
77    base_image: Option<String>,
78
79    linux_compile_type: LinuxCompileType,
80}
81
82impl Node for DockerDeployProcess {
83    type Port = u16;
84    type Meta = ();
85    type InstantiateEnv = DockerDeploy;
86
87    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
88    fn next_port(&self) -> Self::Port {
89        let port = {
90            let mut borrow = self.next_port.borrow_mut();
91            let port = *borrow;
92            *borrow += 1;
93            port
94        };
95
96        port
97    }
98
99    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
100    fn update_meta(&self, _meta: &Self::Meta) {}
101
102    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
103    fn instantiate(
104        &self,
105        _env: &mut Self::InstantiateEnv,
106        meta: &mut Self::Meta,
107        graph: DfirGraph,
108        extra_stmts: &[syn::Stmt],
109        sidecars: &[syn::Expr],
110    ) {
111        let (bin_name, config) = create_graph_trybuild(
112            graph,
113            extra_stmts,
114            sidecars,
115            Some(&self.name),
116            crate::compile::trybuild::generate::DeployMode::Containerized,
117            LinkingMode::Static,
118        );
119
120        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
121            .target_dir(config.target_dir)
122            .example(bin_name)
123            .no_default_features();
124
125        ret = ret.display_name("test_display_name");
126
127        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
128
129        if let Some(features) = config.features {
130            ret = ret.features(features);
131        }
132
133        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
134        ret = ret.config("build.incremental = false");
135
136        *self.rust_crate.borrow_mut() = Some(ret);
137    }
138}
139
140/// Represents a logical cluster, which can be a variable amount of individual containers.
141#[derive(Clone)]
142pub struct DockerDeployCluster {
143    key: LocationKey,
144    name: String,
145    next_port: Rc<RefCell<u16>>,
146    rust_crate: Rc<RefCell<Option<RustCrate>>>,
147
148    exposed_ports: Rc<RefCell<Vec<u16>>>,
149
150    docker_container_name: Rc<RefCell<Vec<String>>>,
151
152    compilation_options: Option<String>,
153
154    config: Vec<String>,
155
156    count: usize,
157
158    base_image: Option<String>,
159
160    linux_compile_type: LinuxCompileType,
161}
162
163impl Node for DockerDeployCluster {
164    type Port = u16;
165    type Meta = ();
166    type InstantiateEnv = DockerDeploy;
167
168    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
169    fn next_port(&self) -> Self::Port {
170        let port = {
171            let mut borrow = self.next_port.borrow_mut();
172            let port = *borrow;
173            *borrow += 1;
174            port
175        };
176
177        port
178    }
179
180    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
181    fn update_meta(&self, _meta: &Self::Meta) {}
182
183    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
184    fn instantiate(
185        &self,
186        _env: &mut Self::InstantiateEnv,
187        _meta: &mut Self::Meta,
188        graph: DfirGraph,
189        extra_stmts: &[syn::Stmt],
190        sidecars: &[syn::Expr],
191    ) {
192        let (bin_name, config) = create_graph_trybuild(
193            graph,
194            extra_stmts,
195            sidecars,
196            Some(&self.name),
197            crate::compile::trybuild::generate::DeployMode::Containerized,
198            LinkingMode::Static,
199        );
200
201        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
202            .target_dir(config.target_dir)
203            .example(bin_name)
204            .no_default_features();
205
206        ret = ret.display_name("test_display_name");
207
208        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
209
210        if let Some(features) = config.features {
211            ret = ret.features(features);
212        }
213
214        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
215        ret = ret.config("build.incremental = false");
216
217        *self.rust_crate.borrow_mut() = Some(ret);
218    }
219}
220
221/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
222#[derive(Clone, Debug)]
223#[expect(
224    dead_code,
225    reason = "fields used via Rc<RefCell> in RegisterPort impl and ExternalBytesPort construction"
226)]
227pub struct DockerDeployExternal {
228    /// The location key for this external, used for port handle construction.
229    pub(crate) key: LocationKey,
230    name: String,
231    next_port: Rc<RefCell<u16>>,
232
233    /// Counter for generating ExternalPortId values at deploy time.
234    next_external_port_id: Rc<RefCell<crate::Counter<ExternalPortId>>>,
235
236    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
237
238    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
239}
240
241impl Node for DockerDeployExternal {
242    type Port = u16;
243    type Meta = ();
244    type InstantiateEnv = DockerDeploy;
245
246    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
247    fn next_port(&self) -> Self::Port {
248        let port = {
249            let mut borrow = self.next_port.borrow_mut();
250            let port = *borrow;
251            *borrow += 1;
252            port
253        };
254
255        port
256    }
257
258    #[instrument(level = "trace", skip_all, fields(name = self.name))]
259    fn update_meta(&self, _meta: &Self::Meta) {}
260
261    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
262    fn instantiate(
263        &self,
264        _env: &mut Self::InstantiateEnv,
265        meta: &mut Self::Meta,
266        graph: DfirGraph,
267        extra_stmts: &[syn::Stmt],
268        sidecars: &[syn::Expr],
269    ) {
270        trace!(name: "surface", surface = graph.surface_syntax_string());
271    }
272}
273
274impl DockerDeployProcess {
275    /// Expose a TCP port on this process for external access.
276    ///
277    /// The binary running on this process must bind a `TcpListener` on this port.
278    /// This method ensures the port appears in the Docker image's `EXPOSE` directives
279    /// and is available for endpoint discovery via [`Self::get_tcp_endpoint`].
280    pub fn expose_port(&self, port: u16) {
281        self.exposed_ports.borrow_mut().push(port);
282    }
283
284    /// Returns the TCP endpoint `(host, port)` for this process exposing
285    /// the given container port. Queries Docker for the dynamically allocated
286    /// host port mapping.
287    pub async fn get_tcp_endpoint(&self, container_port: u16) -> (String, u16) {
288        let name = self
289            .docker_container_name
290            .borrow()
291            .as_ref()
292            .expect("container not yet started")
293            .clone();
294        let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
295        ("localhost".to_owned(), host_port)
296    }
297}
298
299impl DockerDeployCluster {
300    /// Expose a TCP port on every member of this cluster for external access.
301    ///
302    /// The binary running on this cluster must bind a `TcpListener` on this port.
303    /// This method ensures the port appears in the Docker image's `EXPOSE` directives
304    /// and is available for endpoint discovery via [`Self::get_all_tcp_endpoints`].
305    pub fn expose_port(&self, port: u16) {
306        self.exposed_ports.borrow_mut().push(port);
307    }
308
309    /// Returns TCP endpoints `(host, port)` for all cluster members exposing
310    /// the given container port. Queries Docker for the dynamically allocated
311    /// host port mapping.
312    pub async fn get_all_tcp_endpoints(&self, container_port: u16) -> Vec<(String, u16)> {
313        let names = self.docker_container_name.borrow().clone();
314        let mut endpoints = Vec::with_capacity(names.len());
315        for name in names {
316            let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
317            endpoints.push(("localhost".to_owned(), host_port));
318        }
319        endpoints
320    }
321}
322
323type DynSourceSink<Out, In, InErr> = (
324    Pin<Box<dyn Stream<Item = Out>>>,
325    Pin<Box<dyn Sink<In, Error = InErr>>>,
326);
327
328impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
329    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
330    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
331        self.ports.borrow_mut().insert(external_port_id, port);
332    }
333
334    fn as_bytes_bidi(
335        &self,
336        external_port_id: ExternalPortId,
337    ) -> impl Future<
338        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
339    > + 'a {
340        let guard =
341            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
342
343        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
344        let (docker_container_name, remote_port, _) = self
345            .connection_info
346            .borrow()
347            .get(&local_port)
348            .unwrap()
349            .clone();
350
351        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
352
353        async move {
354            let local_port =
355                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
356            let remote_ip_address = "localhost";
357
358            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
359
360            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
361                .await
362                .unwrap();
363
364            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
365
366            let (rx, tx) = stream.into_split();
367
368            let source = Box::pin(
369                FramedRead::new(rx, LengthDelimitedCodec::new()),
370            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
371
372            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
373                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
374
375            (source, sink)
376        }
377        .instrument(guard.exit())
378    }
379
380    fn as_bincode_bidi<InT, OutT>(
381        &self,
382        external_port_id: ExternalPortId,
383    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
384    where
385        InT: serde::Serialize + 'static,
386        OutT: serde::de::DeserializeOwned + 'static,
387    {
388        let guard =
389            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
390
391        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
392        let (docker_container_name, remote_port, _) = self
393            .connection_info
394            .borrow()
395            .get(&local_port)
396            .unwrap()
397            .clone();
398
399        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
400
401        async move {
402            let local_port =
403                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
404            let remote_ip_address = "localhost";
405
406            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
407
408            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
409                .await
410                .unwrap();
411
412            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
413
414            let (rx, tx) = stream.into_split();
415
416            let source = Box::pin(
417                FramedRead::new(rx, LengthDelimitedCodec::new())
418                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
419            ) as Pin<Box<dyn Stream<Item = OutT>>>;
420
421            let sink = Box::pin(
422                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
423                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
424                }),
425            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
426
427            (source, sink)
428        }
429        .instrument(guard.exit())
430    }
431
432    fn as_bincode_sink<T>(
433        &self,
434        external_port_id: ExternalPortId,
435    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
436    where
437        T: serde::Serialize + 'static,
438    {
439        let guard =
440            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
441
442        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
443        let (docker_container_name, remote_port, _) = self
444            .connection_info
445            .borrow()
446            .get(&local_port)
447            .unwrap()
448            .clone();
449
450        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
451
452        async move {
453            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
454            let remote_ip_address = "localhost";
455
456            Box::pin(
457                LazySink::new(move || {
458                    Box::pin(async move {
459                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
460
461                        let stream =
462                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
463                                .await?;
464
465                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
466
467                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
468                            stream,
469                            LengthDelimitedCodec::new(),
470                        ))
471                    })
472                })
473                .with(move |v| async move {
474                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
475                }),
476            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
477        }
478        .instrument(guard.exit())
479    }
480
481    fn as_bincode_source<T>(
482        &self,
483        external_port_id: ExternalPortId,
484    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
485    where
486        T: serde::de::DeserializeOwned + 'static,
487    {
488        let guard =
489            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
490
491        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
492        let (docker_container_name, remote_port, _) = self
493            .connection_info
494            .borrow()
495            .get(&local_port)
496            .unwrap()
497            .clone();
498
499        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
500
501        async move {
502
503            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
504            let remote_ip_address = "localhost";
505
506            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
507
508            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
509                .await
510                .unwrap();
511
512            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
513
514            Box::pin(
515                FramedRead::new(stream, LengthDelimitedCodec::new())
516                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
517            ) as Pin<Box<dyn Stream<Item = T>>>
518        }
519        .instrument(guard.exit())
520    }
521}
522
523#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
524async fn find_dynamically_allocated_docker_port(
525    docker_container_name: &str,
526    destination_port: u16,
527) -> u16 {
528    let docker = Docker::connect_with_local_defaults().unwrap();
529
530    let container_info = docker
531        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
532        .await
533        .unwrap();
534
535    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
536
537    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
538    let remote_port = container_info
539        .network_settings
540        .as_ref()
541        .unwrap()
542        .ports
543        .as_ref()
544        .unwrap()
545        .get(&format!("{destination_port}/tcp"))
546        .unwrap()
547        .as_ref()
548        .unwrap()
549        .iter()
550        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
551        .unwrap()
552        .host_port
553        .as_ref()
554        .unwrap()
555        .parse()
556        .unwrap();
557
558    remote_port
559}
560
561/// For deploying to a local docker instance
562pub struct DockerDeploy {
563    docker_processes: Vec<DockerDeployProcessSpec>,
564    docker_clusters: Vec<DockerDeployClusterSpec>,
565    network: DockerNetwork,
566    deployment_instance: String,
567}
568
569#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
570async fn create_and_start_container(
571    docker: &Docker,
572    container_name: &str,
573    image_name: &str,
574    network_name: &str,
575    deployment_instance: &str,
576) -> Result<(), anyhow::Error> {
577    let config = ContainerCreateBody {
578        image: Some(image_name.to_owned()),
579        hostname: Some(container_name.to_owned()),
580        host_config: Some(HostConfig {
581            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
582            publish_all_ports: Some(true),
583            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
584            ..Default::default()
585        }),
586        env: Some(vec![
587            format!("CONTAINER_NAME={container_name}"),
588            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
589            format!("RUST_LOG=trace"),
590        ]),
591        networking_config: Some(NetworkingConfig {
592            endpoints_config: Some(HashMap::from([(
593                network_name.to_owned(),
594                EndpointSettings {
595                    ..Default::default()
596                },
597            )])),
598        }),
599        tty: Some(true),
600        ..Default::default()
601    };
602
603    let options = CreateContainerOptions {
604        name: Some(container_name.to_owned()),
605        ..Default::default()
606    };
607
608    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
609    docker.create_container(Some(options), config).await?;
610    docker
611        .start_container(container_name, None::<StartContainerOptions>)
612        .await?;
613
614    Ok(())
615}
616
617#[instrument(level = "trace", skip_all, fields(%image_name))]
618async fn build_and_create_image(
619    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
620    compilation_options: Option<&str>,
621    config: &[String],
622    exposed_ports: &[u16],
623    image_name: &str,
624    base_image: Option<&str>,
625    linux_compile_type: LinuxCompileType,
626) -> Result<(), anyhow::Error> {
627    let mut rust_crate = rust_crate
628        .borrow_mut()
629        .take()
630        .unwrap()
631        .rustflags(compilation_options.unwrap_or_default());
632
633    for cfg in config {
634        rust_crate = rust_crate.config(cfg);
635    }
636
637    let build_output = match build_crate_memoized(
638        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(linux_compile_type)),
639    )
640    .await
641    {
642        Ok(build_output) => build_output,
643        Err(BuildError::FailedToBuildCrate {
644            exit_status,
645            diagnostics,
646            text_lines,
647            stderr_lines,
648        }) => {
649            let diagnostics = diagnostics
650                .into_iter()
651                .map(|d| d.rendered.unwrap())
652                .collect::<Vec<_>>()
653                .join("\n");
654            let text_lines = text_lines.join("\n");
655            let stderr_lines = stderr_lines.join("\n");
656
657            anyhow::bail!(
658                r#"
659Failed to build crate {exit_status:?}
660--- diagnostics
661---
662{diagnostics}
663---
664---
665---
666
667--- text_lines
668---
669---
670{text_lines}
671---
672---
673---
674
675--- stderr_lines
676---
677---
678{stderr_lines}
679---
680---
681---"#
682            );
683        }
684        Err(err) => {
685            anyhow::bail!("Failed to build crate {err:?}");
686        }
687    };
688
689    let docker = Docker::connect_with_local_defaults()?;
690
691    let mut tar_data = Vec::new();
692    {
693        let mut tar = Builder::new(&mut tar_data);
694
695        let exposed_ports = exposed_ports
696            .iter()
697            .map(|port| format!("EXPOSE {port}/tcp"))
698            .collect::<Vec<_>>()
699            .join("\n");
700
701        let from_image = base_image.unwrap_or("scratch");
702        let dockerfile_content = format!(
703            r#"
704                FROM {from_image}
705                {exposed_ports}
706                COPY app /app
707                CMD ["/app"]
708            "#,
709        );
710
711        trace!(name: "dockerfile", %dockerfile_content);
712
713        let mut header = Header::new_gnu();
714        header.set_path("Dockerfile")?;
715        header.set_size(dockerfile_content.len() as u64);
716        header.set_cksum();
717        tar.append(&header, dockerfile_content.as_bytes())?;
718
719        let mut header = Header::new_gnu();
720        header.set_path("app")?;
721        header.set_size(build_output.bin_data.len() as u64);
722        header.set_mode(0o755);
723        header.set_cksum();
724        tar.append(&header, &build_output.bin_data[..])?;
725
726        tar.finish()?;
727    }
728
729    let build_options = BuildImageOptions {
730        dockerfile: "Dockerfile".to_owned(),
731        t: Some(image_name.to_owned()),
732        rm: true,
733        ..Default::default()
734    };
735
736    use bollard::errors::Error;
737
738    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
739    let mut build_stream = docker.build_image(build_options, None, Some(body));
740    while let Some(msg) = build_stream.next().await {
741        match msg {
742            Ok(_) => {}
743            Err(e) => match e {
744                Error::DockerStreamError { error } => {
745                    return Err(anyhow::anyhow!(
746                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
747                    ));
748                }
749                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
750            },
751        }
752    }
753
754    Ok(())
755}
756
757impl DockerDeploy {
758    /// Create a new deployment
759    pub fn new(network: DockerNetwork) -> Self {
760        Self {
761            docker_processes: Vec::new(),
762            docker_clusters: Vec::new(),
763            network,
764            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
765        }
766    }
767
768    /// Add an internal docker service to the deployment.
769    pub fn add_localhost_docker(
770        &mut self,
771        compilation_options: Option<String>,
772        config: Vec<String>,
773    ) -> DockerDeployProcessSpec {
774        let process = DockerDeployProcessSpec {
775            compilation_options,
776            config,
777            network: self.network.clone(),
778            deployment_instance: self.deployment_instance.clone(),
779            base_image: None,
780            linux_compile_type: LinuxCompileType::Musl,
781        };
782
783        self.docker_processes.push(process.clone());
784
785        process
786    }
787
788    /// Add an internal docker cluster to the deployment.
789    pub fn add_localhost_docker_cluster(
790        &mut self,
791        compilation_options: Option<String>,
792        config: Vec<String>,
793        count: usize,
794    ) -> DockerDeployClusterSpec {
795        let cluster = DockerDeployClusterSpec {
796            compilation_options,
797            config,
798            count,
799            deployment_instance: self.deployment_instance.clone(),
800            base_image: None,
801            linux_compile_type: LinuxCompileType::Musl,
802        };
803
804        self.docker_clusters.push(cluster.clone());
805
806        cluster
807    }
808
809    /// Add an external process to the deployment.
810    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
811        DockerDeployExternalSpec { name }
812    }
813
814    /// Get the deployment instance from this deployment.
815    pub fn get_deployment_instance(&self) -> String {
816        self.deployment_instance.clone()
817    }
818
819    /// Create docker images.
820    #[instrument(level = "trace", skip_all)]
821    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
822        for (_, _, process) in nodes.get_all_processes() {
823            let exposed_ports = process.exposed_ports.borrow().clone();
824
825            build_and_create_image(
826                &process.rust_crate,
827                process.compilation_options.as_deref(),
828                &process.config,
829                &exposed_ports,
830                &process.name,
831                process.base_image.as_deref(),
832                process.linux_compile_type,
833            )
834            .await?;
835        }
836
837        for (_, _, cluster) in nodes.get_all_clusters() {
838            let exposed_ports = cluster.exposed_ports.borrow().clone();
839            build_and_create_image(
840                &cluster.rust_crate,
841                cluster.compilation_options.as_deref(),
842                &cluster.config,
843                &exposed_ports,
844                &cluster.name,
845                cluster.base_image.as_deref(),
846                cluster.linux_compile_type,
847            )
848            .await?;
849        }
850
851        Ok(())
852    }
853
854    /// Start the deployment, tell docker to create containers from the existing provisioned images.
855    #[instrument(level = "trace", skip_all)]
856    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857        let docker = Docker::connect_with_local_defaults()?;
858
859        match docker
860            .create_network(NetworkCreateRequest {
861                name: self.network.name.clone(),
862                driver: Some("bridge".to_owned()),
863                ..Default::default()
864            })
865            .await
866        {
867            Ok(v) => v.id,
868            Err(e) => {
869                panic!("Failed to create docker network: {e:?}");
870            }
871        };
872
873        for (_, _, process) in nodes.get_all_processes() {
874            let docker_container_name: String = get_docker_container_name(&process.name, None);
875            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
876
877            create_and_start_container(
878                &docker,
879                &docker_container_name,
880                &process.name,
881                &self.network.name,
882                &self.deployment_instance,
883            )
884            .await?;
885        }
886
887        for (_, _, cluster) in nodes.get_all_clusters() {
888            for num in 0..cluster.count {
889                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
890                cluster
891                    .docker_container_name
892                    .borrow_mut()
893                    .push(docker_container_name.clone());
894
895                create_and_start_container(
896                    &docker,
897                    &docker_container_name,
898                    &cluster.name,
899                    &self.network.name,
900                    &self.deployment_instance,
901                )
902                .await?;
903            }
904        }
905
906        Ok(())
907    }
908
909    /// Stop the deployment, destroy all containers
910    #[instrument(level = "trace", skip_all)]
911    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
912        let docker = Docker::connect_with_local_defaults()?;
913
914        for (_, _, process) in nodes.get_all_processes() {
915            let docker_container_name: String = get_docker_container_name(&process.name, None);
916
917            docker
918                .kill_container(&docker_container_name, None::<KillContainerOptions>)
919                .await?;
920        }
921
922        for (_, _, cluster) in nodes.get_all_clusters() {
923            for num in 0..cluster.count {
924                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
925
926                docker
927                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
928                    .await?;
929            }
930        }
931
932        Ok(())
933    }
934
935    /// remove containers, images, and networks.
936    #[instrument(level = "trace", skip_all)]
937    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
938        let docker = Docker::connect_with_local_defaults()?;
939
940        for (_, _, process) in nodes.get_all_processes() {
941            let docker_container_name: String = get_docker_container_name(&process.name, None);
942
943            docker
944                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
945                .await?;
946        }
947
948        for (_, _, cluster) in nodes.get_all_clusters() {
949            for num in 0..cluster.count {
950                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
951
952                docker
953                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
954                    .await?;
955            }
956        }
957
958        docker
959            .remove_network(&self.network.name)
960            .await
961            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
962
963        use bollard::query_parameters::RemoveImageOptions;
964
965        for (_, _, process) in nodes.get_all_processes() {
966            docker
967                .remove_image(&process.name, None::<RemoveImageOptions>, None)
968                .await?;
969        }
970
971        for (_, _, cluster) in nodes.get_all_clusters() {
972            docker
973                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
974                .await?;
975        }
976
977        Ok(())
978    }
979}
980
981impl<'a> Deploy<'a> for DockerDeploy {
982    type Meta = ();
983    type InstantiateEnv = Self;
984
985    type Process = DockerDeployProcess;
986    type Cluster = DockerDeployCluster;
987    type External = DockerDeployExternal;
988
989    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
990    fn o2o_sink_source(
991        _env: &mut Self::InstantiateEnv,
992        p1: &Self::Process,
993        p1_port: &<Self::Process as Node>::Port,
994        p2: &Self::Process,
995        p2_port: &<Self::Process as Node>::Port,
996        name: Option<&str>,
997        networking_info: &crate::networking::NetworkingInfo,
998    ) -> (syn::Expr, syn::Expr) {
999        match networking_info {
1000            crate::networking::NetworkingInfo::Tcp {
1001                fault: crate::networking::TcpFault::FailStop,
1002            } => {}
1003            _ => panic!("Unsupported networking info: {:?}", networking_info),
1004        }
1005
1006        deploy_containerized_o2o(
1007            &p2.name,
1008            name.expect("channel name is required for containerized deployment"),
1009        )
1010    }
1011
1012    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1013    fn o2o_connect(
1014        p1: &Self::Process,
1015        p1_port: &<Self::Process as Node>::Port,
1016        p2: &Self::Process,
1017        p2_port: &<Self::Process as Node>::Port,
1018    ) -> Box<dyn FnOnce()> {
1019        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1020
1021        Box::new(move || {
1022            trace!(name: "o2o_connect thunk", %serialized);
1023        })
1024    }
1025
1026    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1027    fn o2m_sink_source(
1028        _env: &mut Self::InstantiateEnv,
1029        p1: &Self::Process,
1030        p1_port: &<Self::Process as Node>::Port,
1031        c2: &Self::Cluster,
1032        c2_port: &<Self::Cluster as Node>::Port,
1033        name: Option<&str>,
1034        networking_info: &crate::networking::NetworkingInfo,
1035    ) -> (syn::Expr, syn::Expr) {
1036        match networking_info {
1037            crate::networking::NetworkingInfo::Tcp {
1038                fault: crate::networking::TcpFault::FailStop,
1039            } => {}
1040            _ => panic!("Unsupported networking info: {:?}", networking_info),
1041        }
1042
1043        deploy_containerized_o2m(
1044            name.expect("channel name is required for containerized deployment"),
1045        )
1046    }
1047
1048    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1049    fn o2m_connect(
1050        p1: &Self::Process,
1051        p1_port: &<Self::Process as Node>::Port,
1052        c2: &Self::Cluster,
1053        c2_port: &<Self::Cluster as Node>::Port,
1054    ) -> Box<dyn FnOnce()> {
1055        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
1056
1057        Box::new(move || {
1058            trace!(name: "o2m_connect thunk", %serialized);
1059        })
1060    }
1061
1062    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1063    fn m2o_sink_source(
1064        _env: &mut Self::InstantiateEnv,
1065        c1: &Self::Cluster,
1066        c1_port: &<Self::Cluster as Node>::Port,
1067        p2: &Self::Process,
1068        p2_port: &<Self::Process as Node>::Port,
1069        name: Option<&str>,
1070        networking_info: &crate::networking::NetworkingInfo,
1071    ) -> (syn::Expr, syn::Expr) {
1072        match networking_info {
1073            crate::networking::NetworkingInfo::Tcp {
1074                fault: crate::networking::TcpFault::FailStop,
1075            } => {}
1076            _ => panic!("Unsupported networking info: {:?}", networking_info),
1077        }
1078
1079        deploy_containerized_m2o(
1080            &p2.name,
1081            name.expect("channel name is required for containerized deployment"),
1082        )
1083    }
1084
1085    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1086    fn m2o_connect(
1087        c1: &Self::Cluster,
1088        c1_port: &<Self::Cluster as Node>::Port,
1089        p2: &Self::Process,
1090        p2_port: &<Self::Process as Node>::Port,
1091    ) -> Box<dyn FnOnce()> {
1092        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1093
1094        Box::new(move || {
1095            trace!(name: "m2o_connect thunk", %serialized);
1096        })
1097    }
1098
1099    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1100    fn m2m_sink_source(
1101        _env: &mut Self::InstantiateEnv,
1102        c1: &Self::Cluster,
1103        c1_port: &<Self::Cluster as Node>::Port,
1104        c2: &Self::Cluster,
1105        c2_port: &<Self::Cluster as Node>::Port,
1106        name: Option<&str>,
1107        networking_info: &crate::networking::NetworkingInfo,
1108    ) -> (syn::Expr, syn::Expr) {
1109        match networking_info {
1110            crate::networking::NetworkingInfo::Tcp {
1111                fault: crate::networking::TcpFault::FailStop,
1112            } => {}
1113            _ => panic!("Unsupported networking info: {:?}", networking_info),
1114        }
1115
1116        deploy_containerized_m2m(
1117            name.expect("channel name is required for containerized deployment"),
1118        )
1119    }
1120
1121    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1122    fn m2m_connect(
1123        c1: &Self::Cluster,
1124        c1_port: &<Self::Cluster as Node>::Port,
1125        c2: &Self::Cluster,
1126        c2_port: &<Self::Cluster as Node>::Port,
1127    ) -> Box<dyn FnOnce()> {
1128        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1129
1130        Box::new(move || {
1131            trace!(name: "m2m_connect thunk", %serialized);
1132        })
1133    }
1134
1135    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1136    fn e2o_many_source(
1137        extra_stmts: &mut Vec<syn::Stmt>,
1138        p2: &Self::Process,
1139        p2_port: &<Self::Process as Node>::Port,
1140        codec_type: &syn::Type,
1141        shared_handle: String,
1142    ) -> syn::Expr {
1143        p2.exposed_ports.borrow_mut().push(*p2_port);
1144
1145        let socket_ident = syn::Ident::new(
1146            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1147            Span::call_site(),
1148        );
1149
1150        let source_ident = syn::Ident::new(
1151            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1152            Span::call_site(),
1153        );
1154
1155        let sink_ident = syn::Ident::new(
1156            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1157            Span::call_site(),
1158        );
1159
1160        let membership_ident = syn::Ident::new(
1161            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1162            Span::call_site(),
1163        );
1164
1165        let bind_addr = format!("0.0.0.0:{}", p2_port);
1166
1167        extra_stmts.push(syn::parse_quote! {
1168            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1169        });
1170
1171        let root = crate::staging_util::get_this_crate();
1172
1173        extra_stmts.push(syn::parse_quote! {
1174            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1175        });
1176
1177        parse_quote!(#source_ident)
1178    }
1179
1180    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1181    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1182        let sink_ident = syn::Ident::new(
1183            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1184            Span::call_site(),
1185        );
1186        parse_quote!(#sink_ident)
1187    }
1188
1189    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1190    fn e2o_source(
1191        extra_stmts: &mut Vec<syn::Stmt>,
1192        p1: &Self::External,
1193        p1_port: &<Self::External as Node>::Port,
1194        p2: &Self::Process,
1195        p2_port: &<Self::Process as Node>::Port,
1196        _codec_type: &syn::Type,
1197        shared_handle: String,
1198    ) -> syn::Expr {
1199        p1.connection_info.borrow_mut().insert(
1200            *p1_port,
1201            (
1202                p2.docker_container_name.clone(),
1203                *p2_port,
1204                p2.network.clone(),
1205            ),
1206        );
1207
1208        p2.exposed_ports.borrow_mut().push(*p2_port);
1209
1210        let socket_ident = syn::Ident::new(
1211            &format!("__hydro_deploy_{}_socket", &shared_handle),
1212            Span::call_site(),
1213        );
1214
1215        let source_ident = syn::Ident::new(
1216            &format!("__hydro_deploy_{}_source", &shared_handle),
1217            Span::call_site(),
1218        );
1219
1220        let sink_ident = syn::Ident::new(
1221            &format!("__hydro_deploy_{}_sink", &shared_handle),
1222            Span::call_site(),
1223        );
1224
1225        let bind_addr = format!("0.0.0.0:{}", p2_port);
1226
1227        extra_stmts.push(syn::parse_quote! {
1228            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1229        });
1230
1231        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1232
1233        extra_stmts.push(syn::parse_quote! {
1234            let (#sink_ident, #source_ident) = (#create_expr).split();
1235        });
1236
1237        parse_quote!(#source_ident)
1238    }
1239
1240    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1241    fn e2o_connect(
1242        p1: &Self::External,
1243        p1_port: &<Self::External as Node>::Port,
1244        p2: &Self::Process,
1245        p2_port: &<Self::Process as Node>::Port,
1246        many: bool,
1247        server_hint: NetworkHint,
1248    ) -> Box<dyn FnOnce()> {
1249        if server_hint != NetworkHint::Auto {
1250            panic!(
1251                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1252                server_hint
1253            );
1254        }
1255
1256        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1257        if many {
1258            p1.connection_info.borrow_mut().insert(
1259                *p1_port,
1260                (
1261                    p2.docker_container_name.clone(),
1262                    *p2_port,
1263                    p2.network.clone(),
1264                ),
1265            );
1266        }
1267
1268        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1269
1270        Box::new(move || {
1271            trace!(name: "e2o_connect thunk", %serialized);
1272        })
1273    }
1274
1275    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1276    fn o2e_sink(
1277        p1: &Self::Process,
1278        p1_port: &<Self::Process as Node>::Port,
1279        p2: &Self::External,
1280        p2_port: &<Self::External as Node>::Port,
1281        shared_handle: String,
1282    ) -> syn::Expr {
1283        let sink_ident = syn::Ident::new(
1284            &format!("__hydro_deploy_{}_sink", &shared_handle),
1285            Span::call_site(),
1286        );
1287        parse_quote!(#sink_ident)
1288    }
1289
1290    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1291    fn cluster_ids(
1292        of_cluster: LocationKey,
1293    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1294        cluster_ids()
1295    }
1296
1297    #[instrument(level = "trace", skip_all)]
1298    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1299        cluster_self_id()
1300    }
1301
1302    #[instrument(level = "trace", skip_all, fields(?location_id))]
1303    fn cluster_membership_stream(
1304        _env: &mut Self::InstantiateEnv,
1305        _at_location: &LocationId,
1306        location_id: &LocationId,
1307    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1308    {
1309        cluster_membership_stream(location_id)
1310    }
1311}
1312
1313const CONTAINER_ALPHABET: [char; 36] = [
1314    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1315    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1316];
1317
1318fn is_valid_docker_image_name(name: &str) -> bool {
1319    regex::Regex::new(r"^[a-z0-9]+([._-][a-z0-9]+)*$")
1320        .unwrap()
1321        .is_match(name)
1322}
1323
1324#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1325fn get_docker_image_name(
1326    name_hint: &str,
1327    location_key: LocationKey,
1328    deployment_instance: &str,
1329) -> String {
1330    let name_hint: String = name_hint
1331        .split("::")
1332        .last()
1333        .unwrap()
1334        .to_ascii_lowercase()
1335        .split(['.', '_', '-'])
1336        .filter(|s| !s.is_empty())
1337        .collect::<Vec<_>>()
1338        .join("-");
1339
1340    let image_name = format!("hy-{name_hint}-{deployment_instance}-{location_key}");
1341
1342    if !is_valid_docker_image_name(&image_name) {
1343        panic!(
1344            "Generated Docker image name '{image_name}' is not a valid Docker image name. \
1345             Docker image names may only contain lowercase alphanumeric characters \
1346             separated by single '.', '_', or '-' characters, and must start and end \
1347             with an alphanumeric character. The most likely cause is your location \
1348             struct name '{name_hint}'"
1349        );
1350    }
1351
1352    image_name
1353}
1354
1355#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1356fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1357    if let Some(instance) = instance {
1358        format!("{image_name}-{instance}")
1359    } else {
1360        image_name.to_owned()
1361    }
1362}
1363/// Represents a Process running in a docker container
1364#[derive(Clone)]
1365pub struct DockerDeployProcessSpec {
1366    compilation_options: Option<String>,
1367    config: Vec<String>,
1368    network: DockerNetwork,
1369    deployment_instance: String,
1370    base_image: Option<String>,
1371    linux_compile_type: LinuxCompileType,
1372}
1373
1374impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1375    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1376    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1377        DockerDeployProcess {
1378            key,
1379            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1380
1381            next_port: Rc::new(RefCell::new(1000)),
1382            rust_crate: Rc::new(RefCell::new(None)),
1383
1384            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1385
1386            docker_container_name: Rc::new(RefCell::new(None)),
1387
1388            compilation_options: self.compilation_options,
1389            config: self.config,
1390
1391            network: self.network.clone(),
1392
1393            base_image: self.base_image,
1394            linux_compile_type: self.linux_compile_type,
1395        }
1396    }
1397}
1398
1399/// Represents a Cluster running across `count` docker containers.
1400#[derive(Clone)]
1401pub struct DockerDeployClusterSpec {
1402    compilation_options: Option<String>,
1403    config: Vec<String>,
1404    count: usize,
1405    deployment_instance: String,
1406    base_image: Option<String>,
1407    linux_compile_type: LinuxCompileType,
1408}
1409
1410impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1411    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1412    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1413        DockerDeployCluster {
1414            key,
1415            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1416
1417            next_port: Rc::new(RefCell::new(1000)),
1418            rust_crate: Rc::new(RefCell::new(None)),
1419
1420            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1421
1422            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1423
1424            compilation_options: self.compilation_options,
1425            config: self.config,
1426
1427            count: self.count,
1428
1429            base_image: self.base_image,
1430            linux_compile_type: self.linux_compile_type,
1431        }
1432    }
1433}
1434
1435impl DockerDeployProcessSpec {
1436    /// Set the base Docker image for this process.
1437    /// Defaults to `scratch` if not specified.
1438    pub fn base_image(mut self, image: impl Into<String>) -> Self {
1439        self.base_image = Some(image.into());
1440        self
1441    }
1442
1443    /// Set the Linux compile type (glibc or musl) for this process.
1444    /// Defaults to `Musl` if not specified.
1445    pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1446        self.linux_compile_type = compile_type;
1447        self
1448    }
1449}
1450
1451impl DockerDeployClusterSpec {
1452    /// Set the base Docker image for this cluster.
1453    /// Defaults to `scratch` if not specified.
1454    pub fn base_image(mut self, image: impl Into<String>) -> Self {
1455        self.base_image = Some(image.into());
1456        self
1457    }
1458
1459    /// Set the Linux compile type (glibc or musl) for this cluster.
1460    /// Defaults to `Musl` if not specified.
1461    pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1462        self.linux_compile_type = compile_type;
1463        self
1464    }
1465}
1466
1467/// Represents an external process outside of the management of hydro deploy.
1468pub struct DockerDeployExternalSpec {
1469    name: String,
1470}
1471
1472impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1473    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1474    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1475        DockerDeployExternal {
1476            key,
1477            name: self.name,
1478            next_port: Rc::new(RefCell::new(10000)),
1479            next_external_port_id: Rc::new(RefCell::new(crate::Counter::default())),
1480            ports: Rc::new(RefCell::new(HashMap::new())),
1481            connection_info: Rc::new(RefCell::new(HashMap::new())),
1482        }
1483    }
1484}