pispas_modules/
utils.rs

1use serde_json::Value;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpListener;
7use tokio_tungstenite::{accept_hdr_async, connect_async, tungstenite::protocol::Message};
8use tokio_tungstenite::tungstenite::handshake::server::{Request, Response, ErrorResponse};
9use futures_util::{SinkExt, StreamExt};
10use serde_json::json;
11use url::Url;
12use anyhow::{Result};
13use easy_trace::prelude::{debug, error, info};
14use sharing::utils::ConfigEnv;
15use crate::MyPOSService;
16use crate::prelude::{BaseService};
17use crate::prelude::CashGuardService;
18use crate::prelude::PaytefService;
19use crate::prelude::PrintService;
20use crate::prelude::RfidService;
21use crate::prelude::ScaleService;
22use crate::prelude::{Service, WebSocketWrite};
23
24/// TLS certificate and private key for `local.unpispas.es`.
25///
26/// Issued by Cloudflare Origin CA and embedded at compile time. The CA root
27/// is imported into the Windows trust store during install, so browsers that
28/// connect to `wss://local.unpispas.es:5005` see a publicly-valid chain.
29const LOCAL_TLS_CERT: &[u8] = include_bytes!("../../../resources/local_unpispas.crt");
30const LOCAL_TLS_KEY: &[u8] = include_bytes!("../../../resources/local_unpispas.key");
31
32
33
34/// Loads services based on the environment configuration.
35///
36/// This function reads the required modules from the `MODULES` environment variable.
37/// It initializes and registers the corresponding services into a `HashMap`.
38pub async fn load_services(config: ConfigEnv) -> HashMap<String, Arc<dyn Service>> {
39    let mut services: HashMap<String, Arc<dyn Service>> = HashMap::new();
40
41    // Get the required services from the environment variable or use a default value.
42    let required_services = std::env::var("MODULES")
43        .unwrap_or_else(|_| "base,print,scale,rfid".to_string());
44
45    
46    info!("Loading persistence module: cache");
47    let mut persistence_service = crate::persistence::PersistenceService::new();
48    if let Err(e) = persistence_service.initialize().await {
49        error!("Failed to initialize persistence service: {}", e);
50    }
51    // Share the Arc: keeping one copy in the service registry (exposed to
52    // WS clients as "cache") and handing clones to services that cache
53    // state across sessions (paytef).
54    let persistence_arc = Arc::new(persistence_service);
55    services.insert("cache".to_string(), persistence_arc.clone());
56    // Iterate through the list of service names and instantiate the appropriate service.
57    for service_name in required_services.split(',') {
58        match service_name.trim() {
59            "base" => {
60                services.insert("base".to_string(), Arc::new(BaseService::new()));
61            }
62            "print" => {
63                let print_service = PrintService::new(config.clone()).await;
64                services.insert("print".to_string(), Arc::new(print_service));
65            }
66            "scale" => {
67                services.insert("scale".to_string(), Arc::new(ScaleService::new()));
68            }
69            "rfid" => {
70                services.insert("rfid".to_string(), Arc::new(RfidService::new()));
71            }
72            "cashguard" => {
73                services.insert("cashguard".to_string(), Arc::new(CashGuardService::new("localhost", 8080, "root", "admin")));
74            }
75            "paytef" => {
76                services.insert("paytef".to_string(), Arc::new(PaytefService::new(Some(persistence_arc.clone()))));
77            }
78            "mypos" => {
79                services.insert("mypos".to_string(), Arc::new(MyPOSService::new()));
80            }
81            "commandViewer" => {
82                std::thread::spawn(|| {
83                    let rt = tokio::runtime::Runtime::new().unwrap();
84                    rt.block_on(async {
85                        if let Err(e) = crate::command_viewer::start_kitchen_server().await {
86                            eprintln!("Failed to start Actix server: {:?}", e);
87                        }
88                    });
89                });
90                
91                services.insert("commandViewer".to_string(), Arc::new(crate::command_viewer::CommandViewerService::new()));
92            }
93            "OrderKitchen" => {
94                services.insert("OrderKitchen".to_string(), Arc::new(crate::order_kitchen::OrderKitchenService::new()));
95            }
96            _ => {
97                error!("Unknown service: {}", service_name.trim());
98            }
99        }
100    }
101    info!("Loaded services: {:?}", services.keys().collect::<Vec<_>>());
102    services
103}
104
105/// Returns `true` when `origin` matches any pattern in
106/// [`sharing::PNA_ALLOWED_ORIGINS`] or is a loopback origin.
107///
108/// Loopback origins (`http(s)://localhost[:port]`,
109/// `http(s)://127.0.0.1[:port]`, and the special value `"null"`) are
110/// always allowed so devtools, native apps, and file-served pages keep
111/// working. Host comparison is **exact** — `localhost.evil.com` does
112/// **not** count as loopback.
113///
114/// Allow-list patterns support:
115///   * exact match,
116///   * `scheme://*.domain` — wildcard subdomain. Matches both the apex
117///     (`domain`) and any subdomain (`x.domain`, `x.y.domain`). This is
118///     intentional so a single pattern covers the root and its children.
119///   * `scheme://host:*` — wildcard port.
120fn is_origin_allowed(origin: &str) -> bool {
121    if origin == "null" || is_loopback_origin(origin) {
122        return true;
123    }
124    sharing::PNA_ALLOWED_ORIGINS
125        .iter()
126        .any(|pattern| match_origin_pattern(pattern, origin))
127}
128
129/// Exact-host loopback check. Parses the origin and compares scheme + host
130/// against `localhost` / `127.0.0.1` / `[::1]` so sneaky hosts like
131/// `localhost.evil.com` do not slip through.
132fn is_loopback_origin(origin: &str) -> bool {
133    let (scheme, rest) = match origin.split_once("://") {
134        Some(v) => v,
135        None => return false,
136    };
137    if scheme != "http" && scheme != "https" {
138        return false;
139    }
140    // IPv6 origins are bracketed: `http://[::1]:port`. Strip the host piece
141    // carefully so the colons inside the brackets do not confuse us.
142    let host = if let Some(after_bracket) = rest.strip_prefix('[') {
143        match after_bracket.find(']') {
144            Some(end) => &rest[..end + 2], // include "[" and "]"
145            None => return false,
146        }
147    } else {
148        rest.split(':').next().unwrap_or(rest)
149    };
150    host == "localhost" || host == "127.0.0.1" || host == "[::1]"
151}
152
153/// Matches a single `scheme://host[:port]` origin against a pattern.
154///
155/// See [`is_origin_allowed`] for the pattern grammar. Pure string work,
156/// no regex, no allocations except `format!` in the subdomain branch.
157fn match_origin_pattern(pattern: &str, origin: &str) -> bool {
158    // Split scheme://host[:port]
159    let (p_scheme, p_rest) = match pattern.split_once("://") {
160        Some(v) => v,
161        None => return false,
162    };
163    let (o_scheme, o_rest) = match origin.split_once("://") {
164        Some(v) => v,
165        None => return false,
166    };
167    if p_scheme != o_scheme {
168        return false;
169    }
170
171    // Split host from port
172    let (p_host, p_port) = p_rest.split_once(':').unwrap_or((p_rest, ""));
173    let (o_host, o_port) = o_rest.split_once(':').unwrap_or((o_rest, ""));
174
175    // Port: "*" matches any, "" matches only empty, exact otherwise
176    let port_ok = match p_port {
177        "" => o_port.is_empty(),
178        "*" => true,
179        _ => p_port == o_port,
180    };
181    if !port_ok {
182        return false;
183    }
184
185    // Host: "*.foo.com" matches both the apex "foo.com" and any subdomain
186    // ("x.foo.com", "x.y.foo.com"). Exact patterns match only the same host.
187    if let Some(suffix) = p_host.strip_prefix("*.") {
188        o_host == suffix || o_host.ends_with(&format!(".{}", suffix))
189    } else {
190        p_host == o_host
191    }
192}
193
194#[cfg(test)]
195mod origin_tests {
196    use super::*;
197
198    #[test]
199    fn loopback_exact_hosts_allowed() {
200        assert!(is_loopback_origin("http://localhost"));
201        assert!(is_loopback_origin("https://localhost:3000"));
202        assert!(is_loopback_origin("http://127.0.0.1"));
203        assert!(is_loopback_origin("http://[::1]"));
204    }
205
206    #[test]
207    fn loopback_lookalikes_rejected() {
208        // The bug Copilot reported must not regress.
209        assert!(!is_loopback_origin("http://localhost.evil.com"));
210        assert!(!is_loopback_origin("http://127.0.0.1.evil.com"));
211        assert!(!is_loopback_origin("ws://localhost"));
212        assert!(!is_loopback_origin("ftp://localhost"));
213    }
214
215    #[test]
216    fn pattern_subdomain_matches_apex_and_children() {
217        assert!(match_origin_pattern(
218            "https://*.unpispas.es",
219            "https://unpispas.es"
220        ));
221        assert!(match_origin_pattern(
222            "https://*.unpispas.es",
223            "https://app.unpispas.es"
224        ));
225        assert!(match_origin_pattern(
226            "https://*.unpispas.es",
227            "https://a.b.unpispas.es"
228        ));
229    }
230
231    #[test]
232    fn pattern_wildcard_port() {
233        assert!(match_origin_pattern(
234            "https://*.unpispas.es:*",
235            "https://app.unpispas.es:8443"
236        ));
237        assert!(match_origin_pattern(
238            "https://*.unpispas.es:*",
239            "https://app.unpispas.es"
240        ));
241    }
242
243    #[test]
244    fn pattern_exact_port_enforced() {
245        assert!(match_origin_pattern(
246            "https://app.example.com:443",
247            "https://app.example.com:443"
248        ));
249        assert!(!match_origin_pattern(
250            "https://app.example.com:443",
251            "https://app.example.com:8080"
252        ));
253    }
254
255    #[test]
256    fn pattern_scheme_mismatch_rejected() {
257        assert!(!match_origin_pattern(
258            "https://*.unpispas.es",
259            "http://app.unpispas.es"
260        ));
261    }
262
263    #[test]
264    fn non_pna_origin_not_allowed() {
265        assert!(!is_origin_allowed("https://evil.com"));
266        assert!(!is_origin_allowed("https://unpispas.es.evil.com"));
267    }
268}
269
270/// Builds a TLS acceptor from the embedded cert + key.
271fn build_tls_acceptor() -> std::result::Result<tokio_native_tls::TlsAcceptor, Box<dyn std::error::Error>> {
272    let identity = native_tls::Identity::from_pkcs8(LOCAL_TLS_CERT, LOCAL_TLS_KEY)?;
273    let tls = native_tls::TlsAcceptor::builder(identity).build()?;
274    Ok(tokio_native_tls::TlsAcceptor::from(tls))
275}
276
277/// Starts a local WebSocket server (plain WS or WSS depending on `use_tls`).
278///
279/// When `use_tls` is true the server presents the embedded `local.unpispas.es`
280/// certificate so browsers can connect via `wss://local.unpispas.es:<port>`
281/// without the Chrome *Local Network Access* prompt.
282pub async fn start_local_server(
283    host: &str,
284    port: u16,
285    services: Arc<HashMap<String, Arc<dyn Service>>>,
286    service_name: &str,
287    service_vers: &str,
288    use_tls: bool,
289) -> Result<(), std::io::Error> {
290    let addr = format!("{}:{}", host, port);
291
292    let tls_acceptor = if use_tls {
293        match build_tls_acceptor() {
294            Ok(a) => {
295                info!("TLS enabled for local server (wss://local.unpispas.es:{})", port);
296                Some(a)
297            }
298            Err(e) => {
299                error!("Failed to build TLS acceptor, falling back to plain WS: {}", e);
300                None
301            }
302        }
303    } else {
304        None
305    };
306
307    loop {
308        match TcpListener::bind(&addr).await {
309            Ok(listener) => {
310                let proto = if tls_acceptor.is_some() { "wss" } else { "ws" };
311                info!("Local WebSocket server listening on {} ({}) service name {}", addr, proto, service_name);
312
313                while let Ok((stream, peer)) = listener.accept().await {
314                    if let Err(e) = stream.set_nodelay(true) {
315                        debug!("set_nodelay (peer={}): {}", peer, e);
316                    }
317                    let services = Arc::clone(&services);
318                    let service_name = service_name.to_string();
319                    let service_vers = service_vers.to_string();
320                    let tls = tls_acceptor.clone();
321
322                    tokio::spawn(async move {
323                        // Dual-mode: peek first byte to detect TLS (0x16) vs plain HTTP
324                        let mut first_byte = [0u8; 1];
325                        let n = match stream.peek(&mut first_byte).await {
326                            Ok(n) => n,
327                            Err(e) => {
328                                debug!("peek failed (peer={}): {}", peer, e);
329                                return;
330                            }
331                        };
332                        if n == 0 {
333                            return;
334                        }
335
336                        let is_tls = first_byte[0] == 0x16;
337                        if is_tls {
338                            // TLS handshake first, then WS
339                            if let Some(acceptor) = tls {
340                                match acceptor.accept(stream).await {
341                                    Ok(tls_stream) => {
342                                        if let Err(e) = websocket_handler(tls_stream, peer, services, &service_name, &service_vers).await {
343                                            error!("WSS handler error (peer={}): {}", peer, e);
344                                        }
345                                    }
346                                    Err(e) => {
347                                        debug!("TLS handshake failed (peer={}): {}", peer, e);
348                                    }
349                                }
350                            } else {
351                                debug!("Got TLS handshake but TLS disabled (peer={})", peer);
352                            }
353                        } else {
354                            // Plain HTTP — legacy ws:// clients
355                            if let Err(e) = websocket_handler(stream, peer, services, &service_name, &service_vers).await {
356                                error!("WS handler error (peer={}): {}", peer, e);
357                            }
358                        }
359                    });
360                }
361            }
362            Err(e) => {
363                error!("Failed to bind local server to {}: {}. Retrying in 5 seconds...", addr, e);
364                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
365            }
366        }
367    }
368}
369
370/// Processes an incoming request by routing it to the appropriate service.
371///
372/// This function handles normalization of incoming messages, extracting metadata,
373/// and calling the appropriate service based on the `TARGET` field in the message.
374pub async fn process_request(
375    mut request: Value,
376    services: Arc<HashMap<String, Arc<dyn Service>>>,
377    service_name: &str,
378    service_vers: &str,
379    write: WebSocketWrite,
380) -> Value {
381    // Attempt to extract UUID from the message or use a default value if absent.
382    let uuid = request
383        .get("UUIDV4")
384        .or_else(|| request.get("MESSAGE_UUID"))
385        .cloned()
386        .unwrap_or(Value::Null);
387
388    // Normalize the message if it contains a `MESSAGE_DATA` field with nested content.
389    if let Some(message_data) = request.get("MESSAGE_DATA") {
390        if message_data.is_object() {
391            info!("Normalizing MESSAGE_DATA for remote request");
392            request = message_data.clone();
393
394            // Ensure `UUIDV4` is present in the normalized message.
395            if uuid != Value::Null && !request.get("UUIDV4").is_some() {
396                request["UUIDV4"] = uuid.clone();
397            }
398        }
399    }
400
401    // Check if the message is a connection response.
402    if request.get("result").is_some() && request.get("server").is_some(){
403        info!("Received CONNECT response: {:?}", request);
404        return json!({
405            "SERVICE_NAME": service_name,
406            "SERVICE_VERS": service_vers,
407            "MESSAGE_TYPE": "INFO",
408            "MESSAGE_EXEC": "SUCCESS",
409            "MESSAGE_UUID": uuid,
410            "MESSAGE_DATA": "Server connection acknowledged",
411        });
412    }
413
414    // Route the request to the target service specified in the `TARGET` field.
415    if let Some(target) = request.get("TARGET").and_then(Value::as_str) {
416        if let Some(service) = services.get(target.to_lowercase().as_str()) {
417            let target_lc = target.to_lowercase();
418            // Keep-alive PING (BASE/PING) is silenced to debug so the log
419            // doesn't fill up with ping chatter every 30s per client.
420            let is_ping = target_lc == "base"
421                && request.get("ACTION").and_then(Value::as_str) == Some("PING");
422            if is_ping {
423                debug!("Running service: {}", target_lc);
424            } else {
425                info!("Running service: {}", target_lc);
426            }
427
428            // Execute the service and build the response.
429            let (status, response) = service.run(request.clone(), write).await;
430            let message_data = if let Ok(json_value) = serde_json::from_str::<Value>(&response) {
431                // Si es JSON válido, usar el Value directamente (sin escapar)
432                json_value
433            } else {
434                // Si no es JSON, usar como string
435                Value::String(response)
436            };
437            return json!({
438                "SERVICE_NAME": service_name,
439                "SERVICE_VERS": service_vers,
440                "MESSAGE_TYPE": "RESPONSE",
441                "MESSAGE_EXEC": if status == 0 { "SUCCESS" } else { "FAILURE" },
442                "MESSAGE_UUID": uuid,
443                "MESSAGE_DATA": message_data,
444            });
445        } else {
446            error!("Unknown service: {}", target);
447            return json!({
448                "SERVICE_NAME": service_name,
449                "SERVICE_VERS": service_vers,
450                "MESSAGE_TYPE": "RESPONSE",
451                "MESSAGE_EXEC": "FAILURE",
452                "MESSAGE_UUID": uuid,
453                "MESSAGE_DATA": format!("Unknown service: {}", target),
454            });
455        }
456    }
457
458    if request.get("SERVICE_NAME").is_some() {
459        error!("Missing TARGET in request.");
460    }
461
462
463    json!({
464        "SERVICE_NAME": service_name,
465        "SERVICE_VERS": service_vers,
466        "MESSAGE_TYPE": "RESPONSE",
467        "MESSAGE_EXEC": "FAILURE",
468        "MESSAGE_UUID": uuid,
469        "MESSAGE_DATA": "Missing TARGET in request.",
470    })
471}
472
473/// Handles a single WebSocket connection end-to-end.
474///
475/// The connection has already passed the dual-mode (TLS vs plain) detection
476/// in `start_local_server`, so `stream` is the appropriate concrete stream
477/// type. We wrap it in [`crate::prelude::BoxedStream`] and run the WS
478/// handshake with [`tokio_tungstenite::accept_hdr_async`] so we can:
479///
480/// 1. Validate the `Origin` header against `sharing::PNA_ALLOWED_ORIGINS`
481///    (with wildcard subdomain / wildcard port matching). Disallowed
482///    origins get HTTP 403.
483/// 2. Inject `Access-Control-Allow-Private-Network: true` into the
484///    101 Switching Protocols response so Chrome's PNA check passes.
485///
486/// After the handshake we read frames, `serde_json`-parse them, and
487/// dispatch to `process_request` which routes by the `TARGET` field.
488///
489/// Note: Chrome does **not** send an OPTIONS preflight before a WebSocket
490/// upgrade for PNA — the PNA check happens inside the WS handshake itself.
491/// So we do not have a separate OPTIONS path.
492pub async fn websocket_handler<S>(
493    stream: S,
494    peer: SocketAddr,
495    services: Arc<HashMap<String, Arc<dyn Service>>>,
496    service_name: &str,
497    service_vers: &str,
498) -> Result<(), Box<dyn std::error::Error>>
499where
500    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
501{
502    let boxed = crate::prelude::BoxedStream(Box::new(stream));
503
504    // Validate Origin against the configured allow-list and inject PNA headers
505    // into the 101 Switching Protocols response.
506    let pna_callback = |req: &Request, mut response: Response| -> std::result::Result<Response, ErrorResponse> {
507        // Helper: build a 400 ErrorResponse when a header value fails to
508        // parse. The previous code used `.parse().unwrap()` which would
509        // panic the server thread on a malformed Origin (trivial DoS).
510        fn bad_request(reason: &str) -> ErrorResponse {
511            let mut resp = ErrorResponse::new(Some(reason.to_string()));
512            *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::BAD_REQUEST;
513            resp
514        }
515
516        // The two static values are fixed, ASCII-safe constants — they're
517        // built once and reused for every handshake. If they ever fail to
518        // parse it's a programming error, not user input.
519        let pna_true: tokio_tungstenite::tungstenite::http::HeaderValue =
520            "true".parse().expect("Access-Control-Allow-Private-Network value parses");
521        let vary_origin: tokio_tungstenite::tungstenite::http::HeaderValue =
522            "Origin".parse().expect("Vary header value parses");
523
524        let origin = req.headers().get("Origin").and_then(|v| v.to_str().ok());
525        match origin {
526            None => {
527                // Non-browser clients (native apps, CLI) often omit Origin — allow.
528                let h = response.headers_mut();
529                h.insert("Access-Control-Allow-Private-Network", pna_true);
530                Ok(response)
531            }
532            Some(o) if is_origin_allowed(o) => {
533                // Origin is user-controlled. A weird/malformed string can
534                // make HeaderValue::from_str fail; reject with 400 instead
535                // of panicking the server.
536                let origin_value = match o.parse::<tokio_tungstenite::tungstenite::http::HeaderValue>() {
537                    Ok(v) => v,
538                    Err(_) => {
539                        error!("Rejected WS handshake: Origin header is not a valid HeaderValue: {}", o);
540                        return Err(bad_request("Invalid Origin header"));
541                    }
542                };
543                let h = response.headers_mut();
544                h.insert("Access-Control-Allow-Private-Network", pna_true);
545                h.insert("Access-Control-Allow-Origin", origin_value);
546                h.insert("Vary", vary_origin);
547                Ok(response)
548            }
549            Some(o) => {
550                error!("Rejected WS handshake from disallowed origin: {}", o);
551                let mut resp = ErrorResponse::new(Some("Forbidden origin".to_string()));
552                *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::FORBIDDEN;
553                Err(resp)
554            }
555        }
556    };
557
558    let ws_stream = match accept_hdr_async(boxed, pna_callback).await {
559        Ok(ws) => {
560            debug!("WebSocket handshake OK (peer={})", peer);
561            ws
562        }
563        Err(e) => {
564            error!(
565                "WebSocket handshake failed (peer={}): {} (need RFC6455 WebSocket upgrade, e.g. browser WebSocket API)",
566                peer, e
567            );
568            return Err(e.into());
569        }
570    };
571    let (write, mut read) = ws_stream.split();
572
573    // Create a write lock to manage sending responses back to the client.
574    let rw_write = Arc::new(tokio::sync::RwLock::new(write));
575
576    while let Some(msg) = read.next().await {
577        match msg {
578            Ok(Message::Text(text)) => {
579                let parsed = serde_json::from_str::<serde_json::Value>(&text);
580
581                // Quiet keep-alive pings so they don't drown real traffic.
582                // Anything else (CHECK, print requests, etc.) stays at info.
583                let is_ping = parsed
584                    .as_ref()
585                    .ok()
586                    .and_then(|req| req.get("ACTION"))
587                    .and_then(Value::as_str)
588                    == Some("PING");
589                let truncated = if text.len() > 200 {
590                    format!("{}...", &text[..200])
591                } else {
592                    text.clone()
593                };
594                if is_ping {
595                    debug!("Received text message HANDLER (peer={}): {}", peer, truncated);
596                } else {
597                    info!("Received text message HANDLER (peer={}): {}", peer, truncated);
598                }
599
600                match parsed {
601                    Ok(request) => {
602                        let response_message = process_request(
603                            request,
604                            Arc::clone(&services),
605                            service_name,
606                            service_vers,
607                            Some(Arc::clone(&rw_write)),
608                        )
609                            .await;
610
611                        let mut ws = rw_write.write().await;
612                        if let Err(e) = ws.send(Message::Text(response_message.to_string())).await {
613                            error!("Failed to send response: {}", e);
614                        }
615                    }
616                    Err(e) => {
617                        //response error!
618                        let error_response = json!({
619                            "SERVICE_NAME": service_name,
620                            "SERVICE_VERS": service_vers,
621                            "MESSAGE_TYPE": "RESPONSE",
622                            "MESSAGE_EXEC": "FAILURE",
623                            "MESSAGE_DATA": format!("Invalid JSON: {}", e),
624                        });
625                        let mut ws = rw_write.write().await;
626                        if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
627                            error!("Failed to send error response: {}", e);
628                        }
629                        error!("Invalid JSON: {}", e);
630                    }
631                }
632            }
633            Err(e) => {
634                let error_response = json!({
635                            "SERVICE_NAME": service_name,
636                            "SERVICE_VERS": service_vers,
637                            "MESSAGE_TYPE": "RESPONSE",
638                            "MESSAGE_EXEC": "FAILURE",
639                            "MESSAGE_DATA": format!("Invalid JSON: {}", e),
640                        });
641                let mut ws = rw_write.write().await;
642                if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
643                    error!("Failed to send error response: {}", e);
644                }
645                debug!("Error processing message: {}", e)
646            },
647            _ => {}
648        }
649    }
650
651    Ok(())
652}
653
654/// Starts the remote WebSocket server with reconnection and message handling.
655///
656/// This function keeps trying to connect to the remote server in a loop,
657/// reconnecting automatically if the connection fails.
658pub async fn start_remote_server(
659    remote_host: &str,
660    remote_port: u16,
661    service_name: &str,
662    service_vers: &str,
663    services: Arc<HashMap<String, Arc<dyn Service>>>,
664) -> Result<(), std::io::Error> {
665    loop {
666        match connect_to_remote_server(
667            remote_host,
668            remote_port,
669            service_name,
670            service_vers,
671            Arc::clone(&services),
672            false,
673        )
674            .await
675        {
676            Ok(_) => info!("Successfully connected to remote server."),
677            Err(e) => {
678                error!("Failed to connect to remote server: {:?}", e);
679                info!("Reconnecting in 5 seconds...");
680                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
681            }
682        }
683    }
684}
685
686/// Handles a single connection to the remote WebSocket server.
687///
688/// This function sends a `CONNECT` message to the server upon successful connection
689/// and processes incoming messages by routing them to the appropriate services.
690async fn connect_to_remote_server(
691    remote_host: &str,
692    remote_port: u16,
693    service_name: &str,
694    service_vers: &str,
695    services: Arc<HashMap<String, Arc<dyn Service>>>,
696    return_after_connect: bool,
697) -> Result<(), Box<dyn std::error::Error>> {
698    let protocol = if std::env::var("REMOTE_USSL").unwrap_or_else(|_| "true".to_string()) == "true" {
699        "wss"
700    } else {
701        "ws"
702    };
703
704    let url = format!("{}://{}:{}", protocol, remote_host, remote_port);
705    info!("Connecting to remote WebSocket server at: {}", url);
706
707    let ws_url = Url::parse(&url)?;
708    let (ws_stream, _) = connect_async(ws_url.as_str()).await?;
709    let (mut write, mut read) = ws_stream.split();
710
711    let connect_message = json!({
712        "SERVICE_NAME": service_name,
713        "SERVICE_VERS": sharing::VERSION,
714        "MESSAGE_TYPE": "CONNECT",
715        "MODULE_LIST": services.iter().map(|(name, service)| {
716            json!({
717                "MODULE_NAME": name,
718                "MODULE_VERS": service.get_version(),
719            })
720        }).collect::<Vec<_>>(),
721    });
722
723    write.send(Message::Text(connect_message.to_string())).await?;
724    info!("CONNECT message sent.");
725
726    if return_after_connect {
727        return Ok(());
728    }
729
730    let write_p = Arc::new(tokio::sync::Mutex::new(write));
731    let service_name = service_name.to_owned();
732    let service_vers = service_vers.to_owned();
733
734    // Shared cancel signal so a keep-alive failure aborts the read loop
735    // immediately instead of leaving it parked on the 45 s timeout below.
736    // Without this the link sat zombie for up to 45 s after a half-open
737    // TCP, which is what made operators restart the "pispas modules"
738    // service by hand.
739    let cancel_token = Arc::new(tokio::sync::Notify::new());
740
741    // Keep-alive: envía PING cada 25 segundos
742    let keep_alive_write = Arc::clone(&write_p);
743    let keep_alive_cancel = Arc::clone(&cancel_token);
744    let keep_alive_task = tokio::spawn(async move {
745        let mut interval = tokio::time::interval(std::time::Duration::from_secs(25));
746
747        loop {
748            interval.tick().await;
749
750            let mut ws = keep_alive_write.lock().await;
751            if let Err(e) = ws.send(Message::Ping(vec![])).await {
752                error!("Keep-alive failed: {}", e);
753                // notify_one (vs notify_waiters) holds a permit if the read
754                // loop hasn't reached its `notified()` yet, so the signal
755                // can't be missed on a fast failure.
756                keep_alive_cancel.notify_one();
757                break;
758            }
759        }
760    });
761
762    // Inactivity timeout — if the server stops sending traffic (or its TCP
763    // reset got dropped by a NAT), give up and reconnect. 45 s is twice the
764    // server's ping_interval (20–30 s) so a single missed pong won't trip it.
765    const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(45);
766
767    let read_cancel = Arc::clone(&cancel_token);
768    let result = async {
769        loop {
770            tokio::select! {
771                biased;
772                _ = read_cancel.notified() => {
773                    error!("Keep-alive task signalled connection loss, breaking read loop");
774                    break;
775                }
776                next = tokio::time::timeout(READ_TIMEOUT, read.next()) => {
777                    match next {
778                        Ok(Some(Ok(Message::Text(text)))) => {
779                            info!("Received message");
780                            if let Ok(request) = serde_json::from_str::<serde_json::Value>(&text) {
781                                let response = process_request(
782                                    request,
783                                    Arc::clone(&services),
784                                    &service_name,
785                                    &service_vers,
786                                    None,
787                                ).await;
788
789                                let mut ws = write_p.lock().await;
790                                ws.send(Message::Text(response.to_string())).await?;
791                            }
792                        }
793                        Ok(Some(Ok(Message::Ping(ping)))) => {
794                            debug!("Received PING from server, sending PONG");
795                            let mut ws = write_p.lock().await;
796                            ws.send(Message::Pong(ping)).await?;
797                        }
798                        Ok(Some(Ok(Message::Pong(_)))) => {
799                            // PONG recibido - conexión sana
800                        }
801                        Ok(Some(Ok(Message::Close(_)))) => {
802                            error!("Connection closed by server");
803                            break;
804                        }
805                        Ok(Some(Err(e))) => {
806                            error!("WebSocket error: {}", e);
807                            break;
808                        }
809                        Ok(None) => {
810                            error!("Connection closed");
811                            break;
812                        }
813                        Err(_) => {
814                            error!(
815                                "No activity in {} seconds, reconnecting",
816                                READ_TIMEOUT.as_secs()
817                            );
818                            break;
819                        }
820                        // Message::Binary / Message::Frame — not part of our
821                        // protocol, but tungstenite can deliver them. Just
822                        // ignore and keep listening.
823                        Ok(Some(Ok(_))) => {}
824                    }
825                }
826            }
827        }
828        Ok::<(), Box<dyn std::error::Error>>(())
829    }.await;
830
831    keep_alive_task.abort();
832    let _ = keep_alive_task.await;
833    result
834}
835
836/// Sends a message to the WebSocket connection.
837///
838/// # Arguments
839/// - `write`: The WebSocket connection.
840/// - `message_data`: The message to send.
841pub async fn send_message(write: &WebSocketWrite, message_data: String) {
842    if let Some(ws_lock) = &write {
843        let mut ws = ws_lock.write().await;
844
845        match ws.send(Message::Text(message_data.clone())).await {
846            Ok(_) => {
847                info!("Message sent successfully: {}", message_data);
848            }
849            Err(e) => {
850                error!("Failed to send message: {:?}", e);
851            }
852        }
853    } else {
854        info!("No WebSocket available to send message.");
855    }
856}
857