1use serde_json::Value;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpListener;
7use tokio_tungstenite::{accept_hdr_async, connect_async, tungstenite::protocol::Message};
8use tokio_tungstenite::tungstenite::handshake::server::{Request, Response, ErrorResponse};
9use futures_util::{SinkExt, StreamExt};
10use serde_json::json;
11use url::Url;
12use anyhow::{Result};
13use easy_trace::prelude::{debug, error, info, warn};
14use sharing::utils::ConfigEnv;
15use crate::MyPOSService;
16use crate::prelude::{BaseService};
17use crate::prelude::CashGuardService;
18use crate::prelude::PaytefService;
19use crate::prelude::PrintService;
20use crate::prelude::RfidService;
21use crate::prelude::ScaleService;
22use crate::prelude::{Service, WebSocketWrite};
23
24const LOCAL_TLS_CERT: &[u8] = include_bytes!("../../../resources/local_unpispas.crt");
30const LOCAL_TLS_KEY: &[u8] = include_bytes!("../../../resources/local_unpispas.key");
31
32
33
34pub async fn load_services(config: ConfigEnv) -> HashMap<String, Arc<dyn Service>> {
39 let mut services: HashMap<String, Arc<dyn Service>> = HashMap::new();
40
41 let required_services = std::env::var("MODULES")
43 .unwrap_or_else(|_| "base,print,scale,rfid".to_string());
44
45
46 info!("Loading persistence module: cache");
47 let mut persistence_service = crate::persistence::PersistenceService::new();
48 if let Err(e) = persistence_service.initialize().await {
49 error!("Failed to initialize persistence service: {}", e);
50 }
51 let persistence_arc = Arc::new(persistence_service);
55 services.insert("cache".to_string(), persistence_arc.clone());
56 for service_name in required_services.split(',') {
58 match service_name.trim() {
59 "base" => {
60 services.insert("base".to_string(), Arc::new(BaseService::new()));
61 }
62 "print" => {
63 let print_service = PrintService::new(config.clone()).await;
64 services.insert("print".to_string(), Arc::new(print_service));
65 }
66 "scale" => {
67 services.insert("scale".to_string(), Arc::new(ScaleService::new()));
68 }
69 "rfid" => {
70 services.insert("rfid".to_string(), Arc::new(RfidService::new()));
71 }
72 "cashguard" => {
73 services.insert("cashguard".to_string(), Arc::new(CashGuardService::new("localhost", 8080, "root", "admin")));
74 }
75 "paytef" => {
76 services.insert("paytef".to_string(), Arc::new(PaytefService::new(Some(persistence_arc.clone()))));
77 }
78 "mypos" => {
79 services.insert("mypos".to_string(), Arc::new(MyPOSService::new()));
80 }
81 "commandViewer" => {
82 std::thread::spawn(|| {
83 let rt = tokio::runtime::Runtime::new().unwrap();
84 rt.block_on(async {
85 if let Err(e) = crate::command_viewer::start_kitchen_server().await {
86 eprintln!("Failed to start Actix server: {:?}", e);
87 }
88 });
89 });
90
91 services.insert("commandViewer".to_string(), Arc::new(crate::command_viewer::CommandViewerService::new()));
92 }
93 "OrderKitchen" => {
94 services.insert("OrderKitchen".to_string(), Arc::new(crate::order_kitchen::OrderKitchenService::new()));
95 }
96 _ => {
97 error!("Unknown service: {}", service_name.trim());
98 }
99 }
100 }
101 info!("Loaded services: {:?}", services.keys().collect::<Vec<_>>());
102 services
103}
104
105fn is_origin_allowed(origin: &str) -> bool {
121 if origin == "null" || is_loopback_origin(origin) {
122 return true;
123 }
124 sharing::PNA_ALLOWED_ORIGINS
125 .iter()
126 .any(|pattern| match_origin_pattern(pattern, origin))
127}
128
129fn is_loopback_origin(origin: &str) -> bool {
133 let (scheme, rest) = match origin.split_once("://") {
134 Some(v) => v,
135 None => return false,
136 };
137 if scheme != "http" && scheme != "https" {
138 return false;
139 }
140 let host = if let Some(after_bracket) = rest.strip_prefix('[') {
143 match after_bracket.find(']') {
144 Some(end) => &rest[..end + 2], None => return false,
146 }
147 } else {
148 rest.split(':').next().unwrap_or(rest)
149 };
150 host == "localhost" || host == "127.0.0.1" || host == "[::1]"
151}
152
153fn match_origin_pattern(pattern: &str, origin: &str) -> bool {
158 let (p_scheme, p_rest) = match pattern.split_once("://") {
160 Some(v) => v,
161 None => return false,
162 };
163 let (o_scheme, o_rest) = match origin.split_once("://") {
164 Some(v) => v,
165 None => return false,
166 };
167 if p_scheme != o_scheme {
168 return false;
169 }
170
171 let (p_host, p_port) = p_rest.split_once(':').unwrap_or((p_rest, ""));
173 let (o_host, o_port) = o_rest.split_once(':').unwrap_or((o_rest, ""));
174
175 let port_ok = match p_port {
177 "" => o_port.is_empty(),
178 "*" => true,
179 _ => p_port == o_port,
180 };
181 if !port_ok {
182 return false;
183 }
184
185 if let Some(suffix) = p_host.strip_prefix("*.") {
188 o_host == suffix || o_host.ends_with(&format!(".{}", suffix))
189 } else {
190 p_host == o_host
191 }
192}
193
194#[cfg(test)]
195mod origin_tests {
196 use super::*;
197
198 #[test]
199 fn loopback_exact_hosts_allowed() {
200 assert!(is_loopback_origin("http://localhost"));
201 assert!(is_loopback_origin("https://localhost:3000"));
202 assert!(is_loopback_origin("http://127.0.0.1"));
203 assert!(is_loopback_origin("http://[::1]"));
204 }
205
206 #[test]
207 fn loopback_lookalikes_rejected() {
208 assert!(!is_loopback_origin("http://localhost.evil.com"));
210 assert!(!is_loopback_origin("http://127.0.0.1.evil.com"));
211 assert!(!is_loopback_origin("ws://localhost"));
212 assert!(!is_loopback_origin("ftp://localhost"));
213 }
214
215 #[test]
216 fn pattern_subdomain_matches_apex_and_children() {
217 assert!(match_origin_pattern(
218 "https://*.unpispas.es",
219 "https://unpispas.es"
220 ));
221 assert!(match_origin_pattern(
222 "https://*.unpispas.es",
223 "https://app.unpispas.es"
224 ));
225 assert!(match_origin_pattern(
226 "https://*.unpispas.es",
227 "https://a.b.unpispas.es"
228 ));
229 }
230
231 #[test]
232 fn pattern_wildcard_port() {
233 assert!(match_origin_pattern(
234 "https://*.unpispas.es:*",
235 "https://app.unpispas.es:8443"
236 ));
237 assert!(match_origin_pattern(
238 "https://*.unpispas.es:*",
239 "https://app.unpispas.es"
240 ));
241 }
242
243 #[test]
244 fn pattern_exact_port_enforced() {
245 assert!(match_origin_pattern(
246 "https://app.example.com:443",
247 "https://app.example.com:443"
248 ));
249 assert!(!match_origin_pattern(
250 "https://app.example.com:443",
251 "https://app.example.com:8080"
252 ));
253 }
254
255 #[test]
256 fn pattern_scheme_mismatch_rejected() {
257 assert!(!match_origin_pattern(
258 "https://*.unpispas.es",
259 "http://app.unpispas.es"
260 ));
261 }
262
263 #[test]
264 fn non_pna_origin_not_allowed() {
265 assert!(!is_origin_allowed("https://evil.com"));
266 assert!(!is_origin_allowed("https://unpispas.es.evil.com"));
267 }
268}
269
270fn build_tls_acceptor() -> std::result::Result<tokio_native_tls::TlsAcceptor, Box<dyn std::error::Error>> {
272 let identity = native_tls::Identity::from_pkcs8(LOCAL_TLS_CERT, LOCAL_TLS_KEY)?;
273 let tls = native_tls::TlsAcceptor::builder(identity).build()?;
274 Ok(tokio_native_tls::TlsAcceptor::from(tls))
275}
276
277pub async fn start_local_server(
283 host: &str,
284 port: u16,
285 services: Arc<HashMap<String, Arc<dyn Service>>>,
286 service_name: &str,
287 service_vers: &str,
288 use_tls: bool,
289) -> Result<(), std::io::Error> {
290 let addr = format!("{}:{}", host, port);
291
292 let tls_acceptor = if use_tls {
293 match build_tls_acceptor() {
294 Ok(a) => {
295 info!("TLS enabled for local server (wss://local.unpispas.es:{})", port);
296 Some(a)
297 }
298 Err(e) => {
299 error!("Failed to build TLS acceptor, falling back to plain WS: {}", e);
300 None
301 }
302 }
303 } else {
304 None
305 };
306
307 loop {
308 match TcpListener::bind(&addr).await {
309 Ok(listener) => {
310 let proto = if tls_acceptor.is_some() { "wss" } else { "ws" };
311 info!("Local WebSocket server listening on {} ({}) service name {}", addr, proto, service_name);
312
313 while let Ok((stream, peer)) = listener.accept().await {
314 if let Err(e) = stream.set_nodelay(true) {
315 debug!("set_nodelay (peer={}): {}", peer, e);
316 }
317 let services = Arc::clone(&services);
318 let service_name = service_name.to_string();
319 let service_vers = service_vers.to_string();
320 let tls = tls_acceptor.clone();
321
322 tokio::spawn(async move {
323 let mut first_byte = [0u8; 1];
325 let n = match stream.peek(&mut first_byte).await {
326 Ok(n) => n,
327 Err(e) => {
328 debug!("peek failed (peer={}): {}", peer, e);
329 return;
330 }
331 };
332 if n == 0 {
333 return;
334 }
335
336 let is_tls = first_byte[0] == 0x16;
337 if is_tls {
338 if let Some(acceptor) = tls {
340 match acceptor.accept(stream).await {
341 Ok(tls_stream) => {
342 if let Err(e) = websocket_handler(tls_stream, peer, services, &service_name, &service_vers).await {
343 error!("WSS handler error (peer={}): {}", peer, e);
344 }
345 }
346 Err(e) => {
347 debug!("TLS handshake failed (peer={}): {}", peer, e);
348 }
349 }
350 } else {
351 debug!("Got TLS handshake but TLS disabled (peer={})", peer);
352 }
353 } else {
354 if let Err(e) = websocket_handler(stream, peer, services, &service_name, &service_vers).await {
356 error!("WS handler error (peer={}): {}", peer, e);
357 }
358 }
359 });
360 }
361 }
362 Err(e) => {
363 error!("Failed to bind local server to {}: {}. Retrying in 5 seconds...", addr, e);
364 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
365 }
366 }
367 }
368}
369
370pub async fn process_request(
375 mut request: Value,
376 services: Arc<HashMap<String, Arc<dyn Service>>>,
377 service_name: &str,
378 service_vers: &str,
379 write: WebSocketWrite,
380) -> Value {
381 let uuid = request
383 .get("UUIDV4")
384 .or_else(|| request.get("MESSAGE_UUID"))
385 .cloned()
386 .unwrap_or(Value::Null);
387
388 if let Some(message_data) = request.get("MESSAGE_DATA") {
390 if message_data.is_object() {
391 info!("Normalizing MESSAGE_DATA for remote request");
392 request = message_data.clone();
393
394 if uuid != Value::Null && !request.get("UUIDV4").is_some() {
396 request["UUIDV4"] = uuid.clone();
397 }
398 }
399 }
400
401 if request.get("result").is_some() && request.get("server").is_some(){
403 info!("Received CONNECT response: {:?}", request);
404
405 if let Some(objid) = request.get("objid").and_then(|v| v.as_i64()) {
410 let mut cfg = sharing::utils::ConfigEnv::load();
411 if cfg.objid != Some(objid) {
412 info!("WSR delivered objid={objid} (was {:?}) — persisting", cfg.objid);
413 cfg.set_objid(objid);
414 }
415 }
416
417 return json!({
418 "SERVICE_NAME": service_name,
419 "SERVICE_VERS": service_vers,
420 "MESSAGE_TYPE": "INFO",
421 "MESSAGE_EXEC": "SUCCESS",
422 "MESSAGE_UUID": uuid,
423 "MESSAGE_DATA": "Server connection acknowledged",
424 });
425 }
426
427 if let Some(target) = request.get("TARGET").and_then(Value::as_str) {
429 if let Some(service) = services.get(target.to_lowercase().as_str()) {
430 let target_lc = target.to_lowercase();
431 let is_ping = target_lc == "base"
434 && request.get("ACTION").and_then(Value::as_str) == Some("PING");
435 if is_ping {
436 debug!("Running service: {}", target_lc);
437 } else {
438 info!("Running service: {}", target_lc);
439 }
440
441 let (status, response) = service.run(request.clone(), write).await;
443 let message_data = if let Ok(json_value) = serde_json::from_str::<Value>(&response) {
444 json_value
446 } else {
447 Value::String(response)
449 };
450 return json!({
451 "SERVICE_NAME": service_name,
452 "SERVICE_VERS": service_vers,
453 "MESSAGE_TYPE": "RESPONSE",
454 "MESSAGE_EXEC": if status == 0 { "SUCCESS" } else { "FAILURE" },
455 "MESSAGE_UUID": uuid,
456 "MESSAGE_DATA": message_data,
457 });
458 } else {
459 error!("Unknown service: {}", target);
460 return json!({
461 "SERVICE_NAME": service_name,
462 "SERVICE_VERS": service_vers,
463 "MESSAGE_TYPE": "RESPONSE",
464 "MESSAGE_EXEC": "FAILURE",
465 "MESSAGE_UUID": uuid,
466 "MESSAGE_DATA": format!("Unknown service: {}", target),
467 });
468 }
469 }
470
471 if request.get("SERVICE_NAME").is_some() {
472 error!("Missing TARGET in request.");
473 }
474
475
476 json!({
477 "SERVICE_NAME": service_name,
478 "SERVICE_VERS": service_vers,
479 "MESSAGE_TYPE": "RESPONSE",
480 "MESSAGE_EXEC": "FAILURE",
481 "MESSAGE_UUID": uuid,
482 "MESSAGE_DATA": "Missing TARGET in request.",
483 })
484}
485
486pub async fn websocket_handler<S>(
506 stream: S,
507 peer: SocketAddr,
508 services: Arc<HashMap<String, Arc<dyn Service>>>,
509 service_name: &str,
510 service_vers: &str,
511) -> Result<(), Box<dyn std::error::Error>>
512where
513 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
514{
515 let boxed = crate::prelude::BoxedStream(Box::new(stream));
516
517 let pna_callback = |req: &Request, mut response: Response| -> std::result::Result<Response, ErrorResponse> {
520 fn bad_request(reason: &str) -> ErrorResponse {
524 let mut resp = ErrorResponse::new(Some(reason.to_string()));
525 *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::BAD_REQUEST;
526 resp
527 }
528
529 let pna_true: tokio_tungstenite::tungstenite::http::HeaderValue =
533 "true".parse().expect("Access-Control-Allow-Private-Network value parses");
534 let vary_origin: tokio_tungstenite::tungstenite::http::HeaderValue =
535 "Origin".parse().expect("Vary header value parses");
536
537 let origin = req.headers().get("Origin").and_then(|v| v.to_str().ok());
538 match origin {
539 None => {
540 let h = response.headers_mut();
542 h.insert("Access-Control-Allow-Private-Network", pna_true);
543 Ok(response)
544 }
545 Some(o) if is_origin_allowed(o) => {
546 let origin_value = match o.parse::<tokio_tungstenite::tungstenite::http::HeaderValue>() {
550 Ok(v) => v,
551 Err(_) => {
552 error!("Rejected WS handshake: Origin header is not a valid HeaderValue: {}", o);
553 return Err(bad_request("Invalid Origin header"));
554 }
555 };
556 let h = response.headers_mut();
557 h.insert("Access-Control-Allow-Private-Network", pna_true);
558 h.insert("Access-Control-Allow-Origin", origin_value);
559 h.insert("Vary", vary_origin);
560 Ok(response)
561 }
562 Some(o) => {
563 error!("Rejected WS handshake from disallowed origin: {}", o);
564 let mut resp = ErrorResponse::new(Some("Forbidden origin".to_string()));
565 *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::FORBIDDEN;
566 Err(resp)
567 }
568 }
569 };
570
571 let ws_stream = match accept_hdr_async(boxed, pna_callback).await {
572 Ok(ws) => {
573 debug!("WebSocket handshake OK (peer={})", peer);
574 ws
575 }
576 Err(e) => {
577 error!(
578 "WebSocket handshake failed (peer={}): {} (need RFC6455 WebSocket upgrade, e.g. browser WebSocket API)",
579 peer, e
580 );
581 return Err(e.into());
582 }
583 };
584 let (write, mut read) = ws_stream.split();
585
586 let rw_write = Arc::new(tokio::sync::RwLock::new(write));
588
589 while let Some(msg) = read.next().await {
590 match msg {
591 Ok(Message::Text(text)) => {
592 let parsed = serde_json::from_str::<serde_json::Value>(&text);
593
594 let is_ping = parsed
597 .as_ref()
598 .ok()
599 .and_then(|req| req.get("ACTION"))
600 .and_then(Value::as_str)
601 == Some("PING");
602 let truncated = if text.len() > 200 {
603 format!("{}...", &text[..200])
604 } else {
605 text.clone()
606 };
607 if is_ping {
608 debug!("Received text message HANDLER (peer={}): {}", peer, truncated);
609 } else {
610 info!("Received text message HANDLER (peer={}): {}", peer, truncated);
611 }
612
613 match parsed {
614 Ok(request) => {
615 let response_message = process_request(
616 request,
617 Arc::clone(&services),
618 service_name,
619 service_vers,
620 Some(Arc::clone(&rw_write)),
621 )
622 .await;
623
624 let mut ws = rw_write.write().await;
625 if let Err(e) = ws.send(Message::Text(response_message.to_string())).await {
626 error!("Failed to send response: {}", e);
627 }
628 }
629 Err(e) => {
630 let error_response = json!({
632 "SERVICE_NAME": service_name,
633 "SERVICE_VERS": service_vers,
634 "MESSAGE_TYPE": "RESPONSE",
635 "MESSAGE_EXEC": "FAILURE",
636 "MESSAGE_DATA": format!("Invalid JSON: {}", e),
637 });
638 let mut ws = rw_write.write().await;
639 if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
640 error!("Failed to send error response: {}", e);
641 }
642 error!("Invalid JSON: {}", e);
643 }
644 }
645 }
646 Err(e) => {
647 let error_response = json!({
648 "SERVICE_NAME": service_name,
649 "SERVICE_VERS": service_vers,
650 "MESSAGE_TYPE": "RESPONSE",
651 "MESSAGE_EXEC": "FAILURE",
652 "MESSAGE_DATA": format!("Invalid JSON: {}", e),
653 });
654 let mut ws = rw_write.write().await;
655 if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
656 error!("Failed to send error response: {}", e);
657 }
658 debug!("Error processing message: {}", e)
659 },
660 _ => {}
661 }
662 }
663
664 Ok(())
665}
666
667pub async fn start_remote_server(
679 remote_host: &str,
680 remote_port: u16,
681 service_name: &str,
682 service_vers: &str,
683 services: Arc<HashMap<String, Arc<dyn Service>>>,
684) -> Result<(), std::io::Error> {
685 loop {
686 let cfg = sharing::utils::ConfigEnv::load();
690 let Some(place_id) = cfg.place_id else {
691 info!(
692 "Remote WSR connect deferred — TPV in PRE-BOOTSTRAP \
693 (waiting for a client CHECK with place_id)"
694 );
695 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
696 continue;
697 };
698
699 match connect_to_remote_server(
700 remote_host,
701 remote_port,
702 service_name,
703 service_vers,
704 place_id,
705 Arc::clone(&services),
706 false,
707 )
708 .await
709 {
710 Ok(_) => info!("Successfully connected to remote server."),
711 Err(e) => {
712 error!("Failed to connect to remote server: {:?}", e);
713 info!("Reconnecting in 5 seconds...");
714 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
715 }
716 }
717 }
718}
719
720fn discover_local_ipv4() -> String {
725 match local_ip_address::local_ip() {
726 Ok(ip) if ip.is_ipv4() => ip.to_string(),
727 Ok(ip) => {
728 warn!("local IP is not IPv4 ({ip}); cert flow will not work over LAN");
729 ip.to_string()
730 }
731 Err(e) => {
732 warn!("could not discover local IP: {e}");
733 "0.0.0.0".to_string()
734 }
735 }
736}
737
738async fn connect_to_remote_server(
743 remote_host: &str,
744 remote_port: u16,
745 service_name: &str,
746 service_vers: &str,
747 place_id: i64,
748 services: Arc<HashMap<String, Arc<dyn Service>>>,
749 return_after_connect: bool,
750) -> Result<(), Box<dyn std::error::Error>> {
751 let protocol = if std::env::var("REMOTE_USSL").unwrap_or_else(|_| "true".to_string()) == "true" {
752 "wss"
753 } else {
754 "ws"
755 };
756
757 let url = format!("{}://{}:{}", protocol, remote_host, remote_port);
758 info!("Connecting to remote WebSocket server at: {}", url);
759
760 let ws_url = Url::parse(&url)?;
761 let (ws_stream, _) = connect_async(ws_url.as_str()).await?;
762 let (mut write, mut read) = ws_stream.split();
763
764 let local_ip = discover_local_ipv4();
765 info!("CONNECT envelope: place_id={place_id} local_ip={local_ip}");
766
767 let connect_message = json!({
768 "SERVICE_NAME": service_name,
769 "SERVICE_VERS": sharing::VERSION,
770 "MESSAGE_TYPE": "CONNECT",
771 "PLACE_ID": place_id,
776 "LOCAL_IP": local_ip,
777 "MODULE_LIST": services.iter().map(|(name, service)| {
778 json!({
779 "MODULE_NAME": name,
780 "MODULE_VERS": service.get_version(),
781 })
782 }).collect::<Vec<_>>(),
783 });
784
785 write.send(Message::Text(connect_message.to_string())).await?;
786 info!("CONNECT message sent.");
787
788 if return_after_connect {
789 return Ok(());
790 }
791
792 let write_p = Arc::new(tokio::sync::Mutex::new(write));
793 let service_name = service_name.to_owned();
794 let service_vers = service_vers.to_owned();
795
796 let cancel_token = Arc::new(tokio::sync::Notify::new());
802
803 let keep_alive_write = Arc::clone(&write_p);
805 let keep_alive_cancel = Arc::clone(&cancel_token);
806 let keep_alive_task = tokio::spawn(async move {
807 let mut interval = tokio::time::interval(std::time::Duration::from_secs(25));
808
809 loop {
810 interval.tick().await;
811
812 let mut ws = keep_alive_write.lock().await;
813 if let Err(e) = ws.send(Message::Ping(vec![])).await {
814 error!("Keep-alive failed: {}", e);
815 keep_alive_cancel.notify_one();
819 break;
820 }
821 }
822 });
823
824 const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(45);
828
829 let read_cancel = Arc::clone(&cancel_token);
830 let result = async {
831 loop {
832 tokio::select! {
833 biased;
834 _ = read_cancel.notified() => {
835 error!("Keep-alive task signalled connection loss, breaking read loop");
836 break;
837 }
838 next = tokio::time::timeout(READ_TIMEOUT, read.next()) => {
839 match next {
840 Ok(Some(Ok(Message::Text(text)))) => {
841 info!("Received message");
842 if let Ok(request) = serde_json::from_str::<serde_json::Value>(&text) {
843 let response = process_request(
844 request,
845 Arc::clone(&services),
846 &service_name,
847 &service_vers,
848 None,
849 ).await;
850
851 let mut ws = write_p.lock().await;
852 ws.send(Message::Text(response.to_string())).await?;
853 }
854 }
855 Ok(Some(Ok(Message::Ping(ping)))) => {
856 debug!("Received PING from server, sending PONG");
857 let mut ws = write_p.lock().await;
858 ws.send(Message::Pong(ping)).await?;
859 }
860 Ok(Some(Ok(Message::Pong(_)))) => {
861 }
863 Ok(Some(Ok(Message::Close(_)))) => {
864 error!("Connection closed by server");
865 break;
866 }
867 Ok(Some(Err(e))) => {
868 error!("WebSocket error: {}", e);
869 break;
870 }
871 Ok(None) => {
872 error!("Connection closed");
873 break;
874 }
875 Err(_) => {
876 error!(
877 "No activity in {} seconds, reconnecting",
878 READ_TIMEOUT.as_secs()
879 );
880 break;
881 }
882 Ok(Some(Ok(_))) => {}
886 }
887 }
888 }
889 }
890 Ok::<(), Box<dyn std::error::Error>>(())
891 }.await;
892
893 keep_alive_task.abort();
894 let _ = keep_alive_task.await;
895 result
896}
897
898pub async fn send_message(write: &WebSocketWrite, message_data: String) {
904 if let Some(ws_lock) = &write {
905 let mut ws = ws_lock.write().await;
906
907 match ws.send(Message::Text(message_data.clone())).await {
908 Ok(_) => {
909 info!("Message sent successfully: {}", message_data);
910 }
911 Err(e) => {
912 error!("Failed to send message: {:?}", e);
913 }
914 }
915 } else {
916 info!("No WebSocket available to send message.");
917 }
918}
919