pispas_modules/
utils.rs

1use serde_json::Value;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpListener;
7use tokio_tungstenite::{accept_hdr_async, connect_async, tungstenite::protocol::Message};
8use tokio_tungstenite::tungstenite::handshake::server::{Request, Response, ErrorResponse};
9use futures_util::{SinkExt, StreamExt};
10use serde_json::json;
11use url::Url;
12use anyhow::{Result};
13use easy_trace::prelude::{debug, error, info, warn};
14use sharing::utils::ConfigEnv;
15use crate::MyPOSService;
16use crate::prelude::{BaseService};
17use crate::prelude::CashGuardService;
18use crate::prelude::PaytefService;
19use crate::prelude::PrintService;
20use crate::prelude::RfidService;
21use crate::prelude::ScaleService;
22use crate::prelude::{Service, WebSocketWrite};
23
24/// TLS certificate and private key for `local.unpispas.es`.
25///
26/// Issued by Cloudflare Origin CA and embedded at compile time. The CA root
27/// is imported into the Windows trust store during install, so browsers that
28/// connect to `wss://local.unpispas.es:5005` see a publicly-valid chain.
29const LOCAL_TLS_CERT: &[u8] = include_bytes!("../../../resources/local_unpispas.crt");
30const LOCAL_TLS_KEY: &[u8] = include_bytes!("../../../resources/local_unpispas.key");
31
32
33
34/// Loads services based on the environment configuration.
35///
36/// This function reads the required modules from the `MODULES` environment variable.
37/// It initializes and registers the corresponding services into a `HashMap`.
38pub async fn load_services(config: ConfigEnv) -> HashMap<String, Arc<dyn Service>> {
39    let mut services: HashMap<String, Arc<dyn Service>> = HashMap::new();
40
41    // Get the required services from the environment variable or use a default value.
42    let required_services = std::env::var("MODULES")
43        .unwrap_or_else(|_| "base,print,scale,rfid".to_string());
44
45    
46    info!("Loading persistence module: cache");
47    let mut persistence_service = crate::persistence::PersistenceService::new();
48    if let Err(e) = persistence_service.initialize().await {
49        error!("Failed to initialize persistence service: {}", e);
50    }
51    // Share the Arc: keeping one copy in the service registry (exposed to
52    // WS clients as "cache") and handing clones to services that cache
53    // state across sessions (paytef).
54    let persistence_arc = Arc::new(persistence_service);
55    services.insert("cache".to_string(), persistence_arc.clone());
56    // Iterate through the list of service names and instantiate the appropriate service.
57    for service_name in required_services.split(',') {
58        match service_name.trim() {
59            "base" => {
60                services.insert("base".to_string(), Arc::new(BaseService::new()));
61            }
62            "print" => {
63                let print_service = PrintService::new(config.clone()).await;
64                services.insert("print".to_string(), Arc::new(print_service));
65            }
66            "scale" => {
67                services.insert("scale".to_string(), Arc::new(ScaleService::new()));
68            }
69            "rfid" => {
70                services.insert("rfid".to_string(), Arc::new(RfidService::new()));
71            }
72            "cashguard" => {
73                services.insert("cashguard".to_string(), Arc::new(CashGuardService::new("localhost", 8080, "root", "admin")));
74            }
75            "paytef" => {
76                services.insert("paytef".to_string(), Arc::new(PaytefService::new(Some(persistence_arc.clone()))));
77            }
78            "mypos" => {
79                services.insert("mypos".to_string(), Arc::new(MyPOSService::new()));
80            }
81            "commandViewer" => {
82                std::thread::spawn(|| {
83                    let rt = tokio::runtime::Runtime::new().unwrap();
84                    rt.block_on(async {
85                        if let Err(e) = crate::command_viewer::start_kitchen_server().await {
86                            eprintln!("Failed to start Actix server: {:?}", e);
87                        }
88                    });
89                });
90                
91                services.insert("commandViewer".to_string(), Arc::new(crate::command_viewer::CommandViewerService::new()));
92            }
93            "OrderKitchen" => {
94                services.insert("OrderKitchen".to_string(), Arc::new(crate::order_kitchen::OrderKitchenService::new()));
95            }
96            _ => {
97                error!("Unknown service: {}", service_name.trim());
98            }
99        }
100    }
101    info!("Loaded services: {:?}", services.keys().collect::<Vec<_>>());
102    services
103}
104
105/// Returns `true` when `origin` matches any pattern in
106/// [`sharing::PNA_ALLOWED_ORIGINS`] or is a loopback origin.
107///
108/// Loopback origins (`http(s)://localhost[:port]`,
109/// `http(s)://127.0.0.1[:port]`, and the special value `"null"`) are
110/// always allowed so devtools, native apps, and file-served pages keep
111/// working. Host comparison is **exact** — `localhost.evil.com` does
112/// **not** count as loopback.
113///
114/// Allow-list patterns support:
115///   * exact match,
116///   * `scheme://*.domain` — wildcard subdomain. Matches both the apex
117///     (`domain`) and any subdomain (`x.domain`, `x.y.domain`). This is
118///     intentional so a single pattern covers the root and its children.
119///   * `scheme://host:*` — wildcard port.
120fn is_origin_allowed(origin: &str) -> bool {
121    if origin == "null" || is_loopback_origin(origin) {
122        return true;
123    }
124    sharing::PNA_ALLOWED_ORIGINS
125        .iter()
126        .any(|pattern| match_origin_pattern(pattern, origin))
127}
128
129/// Exact-host loopback check. Parses the origin and compares scheme + host
130/// against `localhost` / `127.0.0.1` / `[::1]` so sneaky hosts like
131/// `localhost.evil.com` do not slip through.
132fn is_loopback_origin(origin: &str) -> bool {
133    let (scheme, rest) = match origin.split_once("://") {
134        Some(v) => v,
135        None => return false,
136    };
137    if scheme != "http" && scheme != "https" {
138        return false;
139    }
140    // IPv6 origins are bracketed: `http://[::1]:port`. Strip the host piece
141    // carefully so the colons inside the brackets do not confuse us.
142    let host = if let Some(after_bracket) = rest.strip_prefix('[') {
143        match after_bracket.find(']') {
144            Some(end) => &rest[..end + 2], // include "[" and "]"
145            None => return false,
146        }
147    } else {
148        rest.split(':').next().unwrap_or(rest)
149    };
150    host == "localhost" || host == "127.0.0.1" || host == "[::1]"
151}
152
153/// Matches a single `scheme://host[:port]` origin against a pattern.
154///
155/// See [`is_origin_allowed`] for the pattern grammar. Pure string work,
156/// no regex, no allocations except `format!` in the subdomain branch.
157fn match_origin_pattern(pattern: &str, origin: &str) -> bool {
158    // Split scheme://host[:port]
159    let (p_scheme, p_rest) = match pattern.split_once("://") {
160        Some(v) => v,
161        None => return false,
162    };
163    let (o_scheme, o_rest) = match origin.split_once("://") {
164        Some(v) => v,
165        None => return false,
166    };
167    if p_scheme != o_scheme {
168        return false;
169    }
170
171    // Split host from port
172    let (p_host, p_port) = p_rest.split_once(':').unwrap_or((p_rest, ""));
173    let (o_host, o_port) = o_rest.split_once(':').unwrap_or((o_rest, ""));
174
175    // Port: "*" matches any, "" matches only empty, exact otherwise
176    let port_ok = match p_port {
177        "" => o_port.is_empty(),
178        "*" => true,
179        _ => p_port == o_port,
180    };
181    if !port_ok {
182        return false;
183    }
184
185    // Host: "*.foo.com" matches both the apex "foo.com" and any subdomain
186    // ("x.foo.com", "x.y.foo.com"). Exact patterns match only the same host.
187    if let Some(suffix) = p_host.strip_prefix("*.") {
188        o_host == suffix || o_host.ends_with(&format!(".{}", suffix))
189    } else {
190        p_host == o_host
191    }
192}
193
194#[cfg(test)]
195mod origin_tests {
196    use super::*;
197
198    #[test]
199    fn loopback_exact_hosts_allowed() {
200        assert!(is_loopback_origin("http://localhost"));
201        assert!(is_loopback_origin("https://localhost:3000"));
202        assert!(is_loopback_origin("http://127.0.0.1"));
203        assert!(is_loopback_origin("http://[::1]"));
204    }
205
206    #[test]
207    fn loopback_lookalikes_rejected() {
208        // The bug Copilot reported must not regress.
209        assert!(!is_loopback_origin("http://localhost.evil.com"));
210        assert!(!is_loopback_origin("http://127.0.0.1.evil.com"));
211        assert!(!is_loopback_origin("ws://localhost"));
212        assert!(!is_loopback_origin("ftp://localhost"));
213    }
214
215    #[test]
216    fn pattern_subdomain_matches_apex_and_children() {
217        assert!(match_origin_pattern(
218            "https://*.unpispas.es",
219            "https://unpispas.es"
220        ));
221        assert!(match_origin_pattern(
222            "https://*.unpispas.es",
223            "https://app.unpispas.es"
224        ));
225        assert!(match_origin_pattern(
226            "https://*.unpispas.es",
227            "https://a.b.unpispas.es"
228        ));
229    }
230
231    #[test]
232    fn pattern_wildcard_port() {
233        assert!(match_origin_pattern(
234            "https://*.unpispas.es:*",
235            "https://app.unpispas.es:8443"
236        ));
237        assert!(match_origin_pattern(
238            "https://*.unpispas.es:*",
239            "https://app.unpispas.es"
240        ));
241    }
242
243    #[test]
244    fn pattern_exact_port_enforced() {
245        assert!(match_origin_pattern(
246            "https://app.example.com:443",
247            "https://app.example.com:443"
248        ));
249        assert!(!match_origin_pattern(
250            "https://app.example.com:443",
251            "https://app.example.com:8080"
252        ));
253    }
254
255    #[test]
256    fn pattern_scheme_mismatch_rejected() {
257        assert!(!match_origin_pattern(
258            "https://*.unpispas.es",
259            "http://app.unpispas.es"
260        ));
261    }
262
263    #[test]
264    fn non_pna_origin_not_allowed() {
265        assert!(!is_origin_allowed("https://evil.com"));
266        assert!(!is_origin_allowed("https://unpispas.es.evil.com"));
267    }
268}
269
270/// Builds a TLS acceptor from the embedded cert + key.
271fn build_tls_acceptor() -> std::result::Result<tokio_native_tls::TlsAcceptor, Box<dyn std::error::Error>> {
272    let identity = native_tls::Identity::from_pkcs8(LOCAL_TLS_CERT, LOCAL_TLS_KEY)?;
273    let tls = native_tls::TlsAcceptor::builder(identity).build()?;
274    Ok(tokio_native_tls::TlsAcceptor::from(tls))
275}
276
277/// Starts a local WebSocket server (plain WS or WSS depending on `use_tls`).
278///
279/// When `use_tls` is true the server presents the embedded `local.unpispas.es`
280/// certificate so browsers can connect via `wss://local.unpispas.es:<port>`
281/// without the Chrome *Local Network Access* prompt.
282pub async fn start_local_server(
283    host: &str,
284    port: u16,
285    services: Arc<HashMap<String, Arc<dyn Service>>>,
286    service_name: &str,
287    service_vers: &str,
288    use_tls: bool,
289) -> Result<(), std::io::Error> {
290    let addr = format!("{}:{}", host, port);
291
292    let tls_acceptor = if use_tls {
293        match build_tls_acceptor() {
294            Ok(a) => {
295                info!("TLS enabled for local server (wss://local.unpispas.es:{})", port);
296                Some(a)
297            }
298            Err(e) => {
299                error!("Failed to build TLS acceptor, falling back to plain WS: {}", e);
300                None
301            }
302        }
303    } else {
304        None
305    };
306
307    loop {
308        match TcpListener::bind(&addr).await {
309            Ok(listener) => {
310                let proto = if tls_acceptor.is_some() { "wss" } else { "ws" };
311                info!("Local WebSocket server listening on {} ({}) service name {}", addr, proto, service_name);
312
313                while let Ok((stream, peer)) = listener.accept().await {
314                    if let Err(e) = stream.set_nodelay(true) {
315                        debug!("set_nodelay (peer={}): {}", peer, e);
316                    }
317                    let services = Arc::clone(&services);
318                    let service_name = service_name.to_string();
319                    let service_vers = service_vers.to_string();
320                    let tls = tls_acceptor.clone();
321
322                    tokio::spawn(async move {
323                        // Dual-mode: peek first byte to detect TLS (0x16) vs plain HTTP
324                        let mut first_byte = [0u8; 1];
325                        let n = match stream.peek(&mut first_byte).await {
326                            Ok(n) => n,
327                            Err(e) => {
328                                debug!("peek failed (peer={}): {}", peer, e);
329                                return;
330                            }
331                        };
332                        if n == 0 {
333                            return;
334                        }
335
336                        let is_tls = first_byte[0] == 0x16;
337                        if is_tls {
338                            // TLS handshake first, then WS
339                            if let Some(acceptor) = tls {
340                                match acceptor.accept(stream).await {
341                                    Ok(tls_stream) => {
342                                        if let Err(e) = websocket_handler(tls_stream, peer, services, &service_name, &service_vers).await {
343                                            error!("WSS handler error (peer={}): {}", peer, e);
344                                        }
345                                    }
346                                    Err(e) => {
347                                        debug!("TLS handshake failed (peer={}): {}", peer, e);
348                                    }
349                                }
350                            } else {
351                                debug!("Got TLS handshake but TLS disabled (peer={})", peer);
352                            }
353                        } else {
354                            // Plain HTTP — legacy ws:// clients
355                            if let Err(e) = websocket_handler(stream, peer, services, &service_name, &service_vers).await {
356                                error!("WS handler error (peer={}): {}", peer, e);
357                            }
358                        }
359                    });
360                }
361            }
362            Err(e) => {
363                error!("Failed to bind local server to {}: {}. Retrying in 5 seconds...", addr, e);
364                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
365            }
366        }
367    }
368}
369
370/// Processes an incoming request by routing it to the appropriate service.
371///
372/// This function handles normalization of incoming messages, extracting metadata,
373/// and calling the appropriate service based on the `TARGET` field in the message.
374pub async fn process_request(
375    mut request: Value,
376    services: Arc<HashMap<String, Arc<dyn Service>>>,
377    service_name: &str,
378    service_vers: &str,
379    write: WebSocketWrite,
380) -> Value {
381    // Attempt to extract UUID from the message or use a default value if absent.
382    let uuid = request
383        .get("UUIDV4")
384        .or_else(|| request.get("MESSAGE_UUID"))
385        .cloned()
386        .unwrap_or(Value::Null);
387
388    // Normalize the message if it contains a `MESSAGE_DATA` field with nested content.
389    if let Some(message_data) = request.get("MESSAGE_DATA") {
390        if message_data.is_object() {
391            info!("Normalizing MESSAGE_DATA for remote request");
392            request = message_data.clone();
393
394            // Ensure `UUIDV4` is present in the normalized message.
395            if uuid != Value::Null && !request.get("UUIDV4").is_some() {
396                request["UUIDV4"] = uuid.clone();
397            }
398        }
399    }
400
401    // Check if the message is a connection response.
402    if request.get("result").is_some() && request.get("server").is_some(){
403        info!("Received CONNECT response: {:?}", request);
404
405        // PROVISIONED → REGISTERED: the WSR returns the backend's PK
406        // for this TPV (`objid` field, RASPPI.objid in the legacy
407        // schema). Persist it so future cert-issuer requests can use
408        // it and so we never have to re-resolve.
409        if let Some(objid) = request.get("objid").and_then(|v| v.as_i64()) {
410            let mut cfg = sharing::utils::ConfigEnv::load();
411            if cfg.objid != Some(objid) {
412                info!("WSR delivered objid={objid} (was {:?}) — persisting", cfg.objid);
413                cfg.set_objid(objid);
414            }
415        }
416
417        return json!({
418            "SERVICE_NAME": service_name,
419            "SERVICE_VERS": service_vers,
420            "MESSAGE_TYPE": "INFO",
421            "MESSAGE_EXEC": "SUCCESS",
422            "MESSAGE_UUID": uuid,
423            "MESSAGE_DATA": "Server connection acknowledged",
424        });
425    }
426
427    // Route the request to the target service specified in the `TARGET` field.
428    if let Some(target) = request.get("TARGET").and_then(Value::as_str) {
429        if let Some(service) = services.get(target.to_lowercase().as_str()) {
430            let target_lc = target.to_lowercase();
431            // Keep-alive PING (BASE/PING) is silenced to debug so the log
432            // doesn't fill up with ping chatter every 30s per client.
433            let is_ping = target_lc == "base"
434                && request.get("ACTION").and_then(Value::as_str) == Some("PING");
435            if is_ping {
436                debug!("Running service: {}", target_lc);
437            } else {
438                info!("Running service: {}", target_lc);
439            }
440
441            // Execute the service and build the response.
442            let (status, response) = service.run(request.clone(), write).await;
443            let message_data = if let Ok(json_value) = serde_json::from_str::<Value>(&response) {
444                // Si es JSON válido, usar el Value directamente (sin escapar)
445                json_value
446            } else {
447                // Si no es JSON, usar como string
448                Value::String(response)
449            };
450            return json!({
451                "SERVICE_NAME": service_name,
452                "SERVICE_VERS": service_vers,
453                "MESSAGE_TYPE": "RESPONSE",
454                "MESSAGE_EXEC": if status == 0 { "SUCCESS" } else { "FAILURE" },
455                "MESSAGE_UUID": uuid,
456                "MESSAGE_DATA": message_data,
457            });
458        } else {
459            error!("Unknown service: {}", target);
460            return json!({
461                "SERVICE_NAME": service_name,
462                "SERVICE_VERS": service_vers,
463                "MESSAGE_TYPE": "RESPONSE",
464                "MESSAGE_EXEC": "FAILURE",
465                "MESSAGE_UUID": uuid,
466                "MESSAGE_DATA": format!("Unknown service: {}", target),
467            });
468        }
469    }
470
471    if request.get("SERVICE_NAME").is_some() {
472        error!("Missing TARGET in request.");
473    }
474
475
476    json!({
477        "SERVICE_NAME": service_name,
478        "SERVICE_VERS": service_vers,
479        "MESSAGE_TYPE": "RESPONSE",
480        "MESSAGE_EXEC": "FAILURE",
481        "MESSAGE_UUID": uuid,
482        "MESSAGE_DATA": "Missing TARGET in request.",
483    })
484}
485
486/// Handles a single WebSocket connection end-to-end.
487///
488/// The connection has already passed the dual-mode (TLS vs plain) detection
489/// in `start_local_server`, so `stream` is the appropriate concrete stream
490/// type. We wrap it in [`crate::prelude::BoxedStream`] and run the WS
491/// handshake with [`tokio_tungstenite::accept_hdr_async`] so we can:
492///
493/// 1. Validate the `Origin` header against `sharing::PNA_ALLOWED_ORIGINS`
494///    (with wildcard subdomain / wildcard port matching). Disallowed
495///    origins get HTTP 403.
496/// 2. Inject `Access-Control-Allow-Private-Network: true` into the
497///    101 Switching Protocols response so Chrome's PNA check passes.
498///
499/// After the handshake we read frames, `serde_json`-parse them, and
500/// dispatch to `process_request` which routes by the `TARGET` field.
501///
502/// Note: Chrome does **not** send an OPTIONS preflight before a WebSocket
503/// upgrade for PNA — the PNA check happens inside the WS handshake itself.
504/// So we do not have a separate OPTIONS path.
505pub async fn websocket_handler<S>(
506    stream: S,
507    peer: SocketAddr,
508    services: Arc<HashMap<String, Arc<dyn Service>>>,
509    service_name: &str,
510    service_vers: &str,
511) -> Result<(), Box<dyn std::error::Error>>
512where
513    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
514{
515    let boxed = crate::prelude::BoxedStream(Box::new(stream));
516
517    // Validate Origin against the configured allow-list and inject PNA headers
518    // into the 101 Switching Protocols response.
519    let pna_callback = |req: &Request, mut response: Response| -> std::result::Result<Response, ErrorResponse> {
520        // Helper: build a 400 ErrorResponse when a header value fails to
521        // parse. The previous code used `.parse().unwrap()` which would
522        // panic the server thread on a malformed Origin (trivial DoS).
523        fn bad_request(reason: &str) -> ErrorResponse {
524            let mut resp = ErrorResponse::new(Some(reason.to_string()));
525            *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::BAD_REQUEST;
526            resp
527        }
528
529        // The two static values are fixed, ASCII-safe constants — they're
530        // built once and reused for every handshake. If they ever fail to
531        // parse it's a programming error, not user input.
532        let pna_true: tokio_tungstenite::tungstenite::http::HeaderValue =
533            "true".parse().expect("Access-Control-Allow-Private-Network value parses");
534        let vary_origin: tokio_tungstenite::tungstenite::http::HeaderValue =
535            "Origin".parse().expect("Vary header value parses");
536
537        let origin = req.headers().get("Origin").and_then(|v| v.to_str().ok());
538        match origin {
539            None => {
540                // Non-browser clients (native apps, CLI) often omit Origin — allow.
541                let h = response.headers_mut();
542                h.insert("Access-Control-Allow-Private-Network", pna_true);
543                Ok(response)
544            }
545            Some(o) if is_origin_allowed(o) => {
546                // Origin is user-controlled. A weird/malformed string can
547                // make HeaderValue::from_str fail; reject with 400 instead
548                // of panicking the server.
549                let origin_value = match o.parse::<tokio_tungstenite::tungstenite::http::HeaderValue>() {
550                    Ok(v) => v,
551                    Err(_) => {
552                        error!("Rejected WS handshake: Origin header is not a valid HeaderValue: {}", o);
553                        return Err(bad_request("Invalid Origin header"));
554                    }
555                };
556                let h = response.headers_mut();
557                h.insert("Access-Control-Allow-Private-Network", pna_true);
558                h.insert("Access-Control-Allow-Origin", origin_value);
559                h.insert("Vary", vary_origin);
560                Ok(response)
561            }
562            Some(o) => {
563                error!("Rejected WS handshake from disallowed origin: {}", o);
564                let mut resp = ErrorResponse::new(Some("Forbidden origin".to_string()));
565                *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::FORBIDDEN;
566                Err(resp)
567            }
568        }
569    };
570
571    let ws_stream = match accept_hdr_async(boxed, pna_callback).await {
572        Ok(ws) => {
573            debug!("WebSocket handshake OK (peer={})", peer);
574            ws
575        }
576        Err(e) => {
577            error!(
578                "WebSocket handshake failed (peer={}): {} (need RFC6455 WebSocket upgrade, e.g. browser WebSocket API)",
579                peer, e
580            );
581            return Err(e.into());
582        }
583    };
584    let (write, mut read) = ws_stream.split();
585
586    // Create a write lock to manage sending responses back to the client.
587    let rw_write = Arc::new(tokio::sync::RwLock::new(write));
588
589    while let Some(msg) = read.next().await {
590        match msg {
591            Ok(Message::Text(text)) => {
592                let parsed = serde_json::from_str::<serde_json::Value>(&text);
593
594                // Quiet keep-alive pings so they don't drown real traffic.
595                // Anything else (CHECK, print requests, etc.) stays at info.
596                let is_ping = parsed
597                    .as_ref()
598                    .ok()
599                    .and_then(|req| req.get("ACTION"))
600                    .and_then(Value::as_str)
601                    == Some("PING");
602                let truncated = if text.len() > 200 {
603                    format!("{}...", &text[..200])
604                } else {
605                    text.clone()
606                };
607                if is_ping {
608                    debug!("Received text message HANDLER (peer={}): {}", peer, truncated);
609                } else {
610                    info!("Received text message HANDLER (peer={}): {}", peer, truncated);
611                }
612
613                match parsed {
614                    Ok(request) => {
615                        let response_message = process_request(
616                            request,
617                            Arc::clone(&services),
618                            service_name,
619                            service_vers,
620                            Some(Arc::clone(&rw_write)),
621                        )
622                            .await;
623
624                        let mut ws = rw_write.write().await;
625                        if let Err(e) = ws.send(Message::Text(response_message.to_string())).await {
626                            error!("Failed to send response: {}", e);
627                        }
628                    }
629                    Err(e) => {
630                        //response error!
631                        let error_response = json!({
632                            "SERVICE_NAME": service_name,
633                            "SERVICE_VERS": service_vers,
634                            "MESSAGE_TYPE": "RESPONSE",
635                            "MESSAGE_EXEC": "FAILURE",
636                            "MESSAGE_DATA": format!("Invalid JSON: {}", e),
637                        });
638                        let mut ws = rw_write.write().await;
639                        if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
640                            error!("Failed to send error response: {}", e);
641                        }
642                        error!("Invalid JSON: {}", e);
643                    }
644                }
645            }
646            Err(e) => {
647                let error_response = json!({
648                            "SERVICE_NAME": service_name,
649                            "SERVICE_VERS": service_vers,
650                            "MESSAGE_TYPE": "RESPONSE",
651                            "MESSAGE_EXEC": "FAILURE",
652                            "MESSAGE_DATA": format!("Invalid JSON: {}", e),
653                        });
654                let mut ws = rw_write.write().await;
655                if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
656                    error!("Failed to send error response: {}", e);
657                }
658                debug!("Error processing message: {}", e)
659            },
660            _ => {}
661        }
662    }
663
664    Ok(())
665}
666
667/// Starts the remote WebSocket server with reconnection and message handling.
668///
669/// This function keeps trying to connect to the remote server in a loop,
670/// reconnecting automatically if the connection fails.
671///
672/// **Bootstrap gate**: the remote server is the WSR (`wss.unpispas.es`)
673/// and expects `place_id` in the CONNECT envelope so it can resolve the
674/// caller to a `RASPPI` row. While `ConfigEnv.place_id` is `None` (TPV
675/// in PRE-BOOTSTRAP, no client has done CHECK + place_id yet), we wait
676/// instead of connecting — sending a CONNECT without identity would
677/// just be rejected and waste a slot in the WSR's pending map.
678pub async fn start_remote_server(
679    remote_host: &str,
680    remote_port: u16,
681    service_name: &str,
682    service_vers: &str,
683    services: Arc<HashMap<String, Arc<dyn Service>>>,
684) -> Result<(), std::io::Error> {
685    loop {
686        // Refresh ConfigEnv on every iteration. The CHECK handler in
687        // base.rs writes place_id to the .env when the first client
688        // pairs the TPV; we pick it up here on the next loop.
689        let cfg = sharing::utils::ConfigEnv::load();
690        let Some(place_id) = cfg.place_id else {
691            info!(
692                "Remote WSR connect deferred — TPV in PRE-BOOTSTRAP \
693                 (waiting for a client CHECK with place_id)"
694            );
695            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
696            continue;
697        };
698
699        match connect_to_remote_server(
700            remote_host,
701            remote_port,
702            service_name,
703            service_vers,
704            place_id,
705            Arc::clone(&services),
706            false,
707        )
708            .await
709        {
710            Ok(_) => info!("Successfully connected to remote server."),
711            Err(e) => {
712                error!("Failed to connect to remote server: {:?}", e);
713                info!("Reconnecting in 5 seconds...");
714                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
715            }
716        }
717    }
718}
719
720/// Detect the host's primary IPv4 on the LAN. Returns a string suitable
721/// for the CONNECT envelope. On any failure we send `"0.0.0.0"` —
722/// that's a clear signal to the backend that discovery failed without
723/// taking the WSR connection down.
724fn discover_local_ipv4() -> String {
725    match local_ip_address::local_ip() {
726        Ok(ip) if ip.is_ipv4() => ip.to_string(),
727        Ok(ip) => {
728            warn!("local IP is not IPv4 ({ip}); cert flow will not work over LAN");
729            ip.to_string()
730        }
731        Err(e) => {
732            warn!("could not discover local IP: {e}");
733            "0.0.0.0".to_string()
734        }
735    }
736}
737
738/// Handles a single connection to the remote WebSocket server.
739///
740/// This function sends a `CONNECT` message to the server upon successful connection
741/// and processes incoming messages by routing them to the appropriate services.
742async fn connect_to_remote_server(
743    remote_host: &str,
744    remote_port: u16,
745    service_name: &str,
746    service_vers: &str,
747    place_id: i64,
748    services: Arc<HashMap<String, Arc<dyn Service>>>,
749    return_after_connect: bool,
750) -> Result<(), Box<dyn std::error::Error>> {
751    let protocol = if std::env::var("REMOTE_USSL").unwrap_or_else(|_| "true".to_string()) == "true" {
752        "wss"
753    } else {
754        "ws"
755    };
756
757    let url = format!("{}://{}:{}", protocol, remote_host, remote_port);
758    info!("Connecting to remote WebSocket server at: {}", url);
759
760    let ws_url = Url::parse(&url)?;
761    let (ws_stream, _) = connect_async(ws_url.as_str()).await?;
762    let (mut write, mut read) = ws_stream.split();
763
764    let local_ip = discover_local_ipv4();
765    info!("CONNECT envelope: place_id={place_id} local_ip={local_ip}");
766
767    let connect_message = json!({
768        "SERVICE_NAME": service_name,
769        "SERVICE_VERS": sharing::VERSION,
770        "MESSAGE_TYPE": "CONNECT",
771        // New bootstrap fields. The WSR resolves
772        // (place_id, service_name) → objid and returns objid in the
773        // response. If the WSR is the legacy server.py it ignores
774        // these and we keep working (just no cert-issuer flow).
775        "PLACE_ID": place_id,
776        "LOCAL_IP": local_ip,
777        "MODULE_LIST": services.iter().map(|(name, service)| {
778            json!({
779                "MODULE_NAME": name,
780                "MODULE_VERS": service.get_version(),
781            })
782        }).collect::<Vec<_>>(),
783    });
784
785    write.send(Message::Text(connect_message.to_string())).await?;
786    info!("CONNECT message sent.");
787
788    if return_after_connect {
789        return Ok(());
790    }
791
792    let write_p = Arc::new(tokio::sync::Mutex::new(write));
793    let service_name = service_name.to_owned();
794    let service_vers = service_vers.to_owned();
795
796    // Shared cancel signal so a keep-alive failure aborts the read loop
797    // immediately instead of leaving it parked on the 45 s timeout below.
798    // Without this the link sat zombie for up to 45 s after a half-open
799    // TCP, which is what made operators restart the "pispas modules"
800    // service by hand.
801    let cancel_token = Arc::new(tokio::sync::Notify::new());
802
803    // Keep-alive: envía PING cada 25 segundos
804    let keep_alive_write = Arc::clone(&write_p);
805    let keep_alive_cancel = Arc::clone(&cancel_token);
806    let keep_alive_task = tokio::spawn(async move {
807        let mut interval = tokio::time::interval(std::time::Duration::from_secs(25));
808
809        loop {
810            interval.tick().await;
811
812            let mut ws = keep_alive_write.lock().await;
813            if let Err(e) = ws.send(Message::Ping(vec![])).await {
814                error!("Keep-alive failed: {}", e);
815                // notify_one (vs notify_waiters) holds a permit if the read
816                // loop hasn't reached its `notified()` yet, so the signal
817                // can't be missed on a fast failure.
818                keep_alive_cancel.notify_one();
819                break;
820            }
821        }
822    });
823
824    // Inactivity timeout — if the server stops sending traffic (or its TCP
825    // reset got dropped by a NAT), give up and reconnect. 45 s is twice the
826    // server's ping_interval (20–30 s) so a single missed pong won't trip it.
827    const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(45);
828
829    let read_cancel = Arc::clone(&cancel_token);
830    let result = async {
831        loop {
832            tokio::select! {
833                biased;
834                _ = read_cancel.notified() => {
835                    error!("Keep-alive task signalled connection loss, breaking read loop");
836                    break;
837                }
838                next = tokio::time::timeout(READ_TIMEOUT, read.next()) => {
839                    match next {
840                        Ok(Some(Ok(Message::Text(text)))) => {
841                            info!("Received message");
842                            if let Ok(request) = serde_json::from_str::<serde_json::Value>(&text) {
843                                let response = process_request(
844                                    request,
845                                    Arc::clone(&services),
846                                    &service_name,
847                                    &service_vers,
848                                    None,
849                                ).await;
850
851                                let mut ws = write_p.lock().await;
852                                ws.send(Message::Text(response.to_string())).await?;
853                            }
854                        }
855                        Ok(Some(Ok(Message::Ping(ping)))) => {
856                            debug!("Received PING from server, sending PONG");
857                            let mut ws = write_p.lock().await;
858                            ws.send(Message::Pong(ping)).await?;
859                        }
860                        Ok(Some(Ok(Message::Pong(_)))) => {
861                            // PONG recibido - conexión sana
862                        }
863                        Ok(Some(Ok(Message::Close(_)))) => {
864                            error!("Connection closed by server");
865                            break;
866                        }
867                        Ok(Some(Err(e))) => {
868                            error!("WebSocket error: {}", e);
869                            break;
870                        }
871                        Ok(None) => {
872                            error!("Connection closed");
873                            break;
874                        }
875                        Err(_) => {
876                            error!(
877                                "No activity in {} seconds, reconnecting",
878                                READ_TIMEOUT.as_secs()
879                            );
880                            break;
881                        }
882                        // Message::Binary / Message::Frame — not part of our
883                        // protocol, but tungstenite can deliver them. Just
884                        // ignore and keep listening.
885                        Ok(Some(Ok(_))) => {}
886                    }
887                }
888            }
889        }
890        Ok::<(), Box<dyn std::error::Error>>(())
891    }.await;
892
893    keep_alive_task.abort();
894    let _ = keep_alive_task.await;
895    result
896}
897
898/// Sends a message to the WebSocket connection.
899///
900/// # Arguments
901/// - `write`: The WebSocket connection.
902/// - `message_data`: The message to send.
903pub async fn send_message(write: &WebSocketWrite, message_data: String) {
904    if let Some(ws_lock) = &write {
905        let mut ws = ws_lock.write().await;
906
907        match ws.send(Message::Text(message_data.clone())).await {
908            Ok(_) => {
909                info!("Message sent successfully: {}", message_data);
910            }
911            Err(e) => {
912                error!("Failed to send message: {:?}", e);
913            }
914        }
915    } else {
916        info!("No WebSocket available to send message.");
917    }
918}
919