1use serde_json::Value;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpListener;
7use tokio_tungstenite::{accept_hdr_async, connect_async, tungstenite::protocol::Message};
8use tokio_tungstenite::tungstenite::handshake::server::{Request, Response, ErrorResponse};
9use futures_util::{SinkExt, StreamExt};
10use serde_json::json;
11use url::Url;
12use anyhow::{Result};
13use easy_trace::prelude::{debug, error, info};
14use sharing::utils::ConfigEnv;
15use crate::MyPOSService;
16use crate::prelude::{BaseService};
17use crate::prelude::CashGuardService;
18use crate::prelude::PaytefService;
19use crate::prelude::PrintService;
20use crate::prelude::RfidService;
21use crate::prelude::ScaleService;
22use crate::prelude::{Service, WebSocketWrite};
23
24const LOCAL_TLS_CERT: &[u8] = include_bytes!("../../../resources/local_unpispas.crt");
30const LOCAL_TLS_KEY: &[u8] = include_bytes!("../../../resources/local_unpispas.key");
31
32
33
34pub async fn load_services(config: ConfigEnv) -> HashMap<String, Arc<dyn Service>> {
39 let mut services: HashMap<String, Arc<dyn Service>> = HashMap::new();
40
41 let required_services = std::env::var("MODULES")
43 .unwrap_or_else(|_| "base,print,scale,rfid".to_string());
44
45
46 info!("Loading persistence module: cache");
47 let mut persistence_service = crate::persistence::PersistenceService::new();
48 if let Err(e) = persistence_service.initialize().await {
49 error!("Failed to initialize persistence service: {}", e);
50 }
51 let persistence_arc = Arc::new(persistence_service);
55 services.insert("cache".to_string(), persistence_arc.clone());
56 for service_name in required_services.split(',') {
58 match service_name.trim() {
59 "base" => {
60 services.insert("base".to_string(), Arc::new(BaseService::new()));
61 }
62 "print" => {
63 let print_service = PrintService::new(config.clone()).await;
64 services.insert("print".to_string(), Arc::new(print_service));
65 }
66 "scale" => {
67 services.insert("scale".to_string(), Arc::new(ScaleService::new()));
68 }
69 "rfid" => {
70 services.insert("rfid".to_string(), Arc::new(RfidService::new()));
71 }
72 "cashguard" => {
73 services.insert("cashguard".to_string(), Arc::new(CashGuardService::new("localhost", 8080, "root", "admin")));
74 }
75 "paytef" => {
76 services.insert("paytef".to_string(), Arc::new(PaytefService::new(Some(persistence_arc.clone()))));
77 }
78 "mypos" => {
79 services.insert("mypos".to_string(), Arc::new(MyPOSService::new()));
80 }
81 "commandViewer" => {
82 std::thread::spawn(|| {
83 let rt = tokio::runtime::Runtime::new().unwrap();
84 rt.block_on(async {
85 if let Err(e) = crate::command_viewer::start_kitchen_server().await {
86 eprintln!("Failed to start Actix server: {:?}", e);
87 }
88 });
89 });
90
91 services.insert("commandViewer".to_string(), Arc::new(crate::command_viewer::CommandViewerService::new()));
92 }
93 "OrderKitchen" => {
94 services.insert("OrderKitchen".to_string(), Arc::new(crate::order_kitchen::OrderKitchenService::new()));
95 }
96 _ => {
97 error!("Unknown service: {}", service_name.trim());
98 }
99 }
100 }
101 info!("Loaded services: {:?}", services.keys().collect::<Vec<_>>());
102 services
103}
104
105fn is_origin_allowed(origin: &str) -> bool {
121 if origin == "null" || is_loopback_origin(origin) {
122 return true;
123 }
124 sharing::PNA_ALLOWED_ORIGINS
125 .iter()
126 .any(|pattern| match_origin_pattern(pattern, origin))
127}
128
129fn is_loopback_origin(origin: &str) -> bool {
133 let (scheme, rest) = match origin.split_once("://") {
134 Some(v) => v,
135 None => return false,
136 };
137 if scheme != "http" && scheme != "https" {
138 return false;
139 }
140 let host = if let Some(after_bracket) = rest.strip_prefix('[') {
143 match after_bracket.find(']') {
144 Some(end) => &rest[..end + 2], None => return false,
146 }
147 } else {
148 rest.split(':').next().unwrap_or(rest)
149 };
150 host == "localhost" || host == "127.0.0.1" || host == "[::1]"
151}
152
153fn match_origin_pattern(pattern: &str, origin: &str) -> bool {
158 let (p_scheme, p_rest) = match pattern.split_once("://") {
160 Some(v) => v,
161 None => return false,
162 };
163 let (o_scheme, o_rest) = match origin.split_once("://") {
164 Some(v) => v,
165 None => return false,
166 };
167 if p_scheme != o_scheme {
168 return false;
169 }
170
171 let (p_host, p_port) = p_rest.split_once(':').unwrap_or((p_rest, ""));
173 let (o_host, o_port) = o_rest.split_once(':').unwrap_or((o_rest, ""));
174
175 let port_ok = match p_port {
177 "" => o_port.is_empty(),
178 "*" => true,
179 _ => p_port == o_port,
180 };
181 if !port_ok {
182 return false;
183 }
184
185 if let Some(suffix) = p_host.strip_prefix("*.") {
188 o_host == suffix || o_host.ends_with(&format!(".{}", suffix))
189 } else {
190 p_host == o_host
191 }
192}
193
194#[cfg(test)]
195mod origin_tests {
196 use super::*;
197
198 #[test]
199 fn loopback_exact_hosts_allowed() {
200 assert!(is_loopback_origin("http://localhost"));
201 assert!(is_loopback_origin("https://localhost:3000"));
202 assert!(is_loopback_origin("http://127.0.0.1"));
203 assert!(is_loopback_origin("http://[::1]"));
204 }
205
206 #[test]
207 fn loopback_lookalikes_rejected() {
208 assert!(!is_loopback_origin("http://localhost.evil.com"));
210 assert!(!is_loopback_origin("http://127.0.0.1.evil.com"));
211 assert!(!is_loopback_origin("ws://localhost"));
212 assert!(!is_loopback_origin("ftp://localhost"));
213 }
214
215 #[test]
216 fn pattern_subdomain_matches_apex_and_children() {
217 assert!(match_origin_pattern(
218 "https://*.unpispas.es",
219 "https://unpispas.es"
220 ));
221 assert!(match_origin_pattern(
222 "https://*.unpispas.es",
223 "https://app.unpispas.es"
224 ));
225 assert!(match_origin_pattern(
226 "https://*.unpispas.es",
227 "https://a.b.unpispas.es"
228 ));
229 }
230
231 #[test]
232 fn pattern_wildcard_port() {
233 assert!(match_origin_pattern(
234 "https://*.unpispas.es:*",
235 "https://app.unpispas.es:8443"
236 ));
237 assert!(match_origin_pattern(
238 "https://*.unpispas.es:*",
239 "https://app.unpispas.es"
240 ));
241 }
242
243 #[test]
244 fn pattern_exact_port_enforced() {
245 assert!(match_origin_pattern(
246 "https://app.example.com:443",
247 "https://app.example.com:443"
248 ));
249 assert!(!match_origin_pattern(
250 "https://app.example.com:443",
251 "https://app.example.com:8080"
252 ));
253 }
254
255 #[test]
256 fn pattern_scheme_mismatch_rejected() {
257 assert!(!match_origin_pattern(
258 "https://*.unpispas.es",
259 "http://app.unpispas.es"
260 ));
261 }
262
263 #[test]
264 fn non_pna_origin_not_allowed() {
265 assert!(!is_origin_allowed("https://evil.com"));
266 assert!(!is_origin_allowed("https://unpispas.es.evil.com"));
267 }
268}
269
270fn build_tls_acceptor() -> std::result::Result<tokio_native_tls::TlsAcceptor, Box<dyn std::error::Error>> {
272 let identity = native_tls::Identity::from_pkcs8(LOCAL_TLS_CERT, LOCAL_TLS_KEY)?;
273 let tls = native_tls::TlsAcceptor::builder(identity).build()?;
274 Ok(tokio_native_tls::TlsAcceptor::from(tls))
275}
276
277pub async fn start_local_server(
283 host: &str,
284 port: u16,
285 services: Arc<HashMap<String, Arc<dyn Service>>>,
286 service_name: &str,
287 service_vers: &str,
288 use_tls: bool,
289) -> Result<(), std::io::Error> {
290 let addr = format!("{}:{}", host, port);
291
292 let tls_acceptor = if use_tls {
293 match build_tls_acceptor() {
294 Ok(a) => {
295 info!("TLS enabled for local server (wss://local.unpispas.es:{})", port);
296 Some(a)
297 }
298 Err(e) => {
299 error!("Failed to build TLS acceptor, falling back to plain WS: {}", e);
300 None
301 }
302 }
303 } else {
304 None
305 };
306
307 loop {
308 match TcpListener::bind(&addr).await {
309 Ok(listener) => {
310 let proto = if tls_acceptor.is_some() { "wss" } else { "ws" };
311 info!("Local WebSocket server listening on {} ({}) service name {}", addr, proto, service_name);
312
313 while let Ok((stream, peer)) = listener.accept().await {
314 if let Err(e) = stream.set_nodelay(true) {
315 debug!("set_nodelay (peer={}): {}", peer, e);
316 }
317 let services = Arc::clone(&services);
318 let service_name = service_name.to_string();
319 let service_vers = service_vers.to_string();
320 let tls = tls_acceptor.clone();
321
322 tokio::spawn(async move {
323 let mut first_byte = [0u8; 1];
325 let n = match stream.peek(&mut first_byte).await {
326 Ok(n) => n,
327 Err(e) => {
328 debug!("peek failed (peer={}): {}", peer, e);
329 return;
330 }
331 };
332 if n == 0 {
333 return;
334 }
335
336 let is_tls = first_byte[0] == 0x16;
337 if is_tls {
338 if let Some(acceptor) = tls {
340 match acceptor.accept(stream).await {
341 Ok(tls_stream) => {
342 if let Err(e) = websocket_handler(tls_stream, peer, services, &service_name, &service_vers).await {
343 error!("WSS handler error (peer={}): {}", peer, e);
344 }
345 }
346 Err(e) => {
347 debug!("TLS handshake failed (peer={}): {}", peer, e);
348 }
349 }
350 } else {
351 debug!("Got TLS handshake but TLS disabled (peer={})", peer);
352 }
353 } else {
354 if let Err(e) = websocket_handler(stream, peer, services, &service_name, &service_vers).await {
356 error!("WS handler error (peer={}): {}", peer, e);
357 }
358 }
359 });
360 }
361 }
362 Err(e) => {
363 error!("Failed to bind local server to {}: {}. Retrying in 5 seconds...", addr, e);
364 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
365 }
366 }
367 }
368}
369
370pub async fn process_request(
375 mut request: Value,
376 services: Arc<HashMap<String, Arc<dyn Service>>>,
377 service_name: &str,
378 service_vers: &str,
379 write: WebSocketWrite,
380) -> Value {
381 let uuid = request
383 .get("UUIDV4")
384 .or_else(|| request.get("MESSAGE_UUID"))
385 .cloned()
386 .unwrap_or(Value::Null);
387
388 if let Some(message_data) = request.get("MESSAGE_DATA") {
390 if message_data.is_object() {
391 info!("Normalizing MESSAGE_DATA for remote request");
392 request = message_data.clone();
393
394 if uuid != Value::Null && !request.get("UUIDV4").is_some() {
396 request["UUIDV4"] = uuid.clone();
397 }
398 }
399 }
400
401 if request.get("result").is_some() && request.get("server").is_some(){
403 info!("Received CONNECT response: {:?}", request);
404 return json!({
405 "SERVICE_NAME": service_name,
406 "SERVICE_VERS": service_vers,
407 "MESSAGE_TYPE": "INFO",
408 "MESSAGE_EXEC": "SUCCESS",
409 "MESSAGE_UUID": uuid,
410 "MESSAGE_DATA": "Server connection acknowledged",
411 });
412 }
413
414 if let Some(target) = request.get("TARGET").and_then(Value::as_str) {
416 if let Some(service) = services.get(target.to_lowercase().as_str()) {
417 let target_lc = target.to_lowercase();
418 let is_ping = target_lc == "base"
421 && request.get("ACTION").and_then(Value::as_str) == Some("PING");
422 if is_ping {
423 debug!("Running service: {}", target_lc);
424 } else {
425 info!("Running service: {}", target_lc);
426 }
427
428 let (status, response) = service.run(request.clone(), write).await;
430 let message_data = if let Ok(json_value) = serde_json::from_str::<Value>(&response) {
431 json_value
433 } else {
434 Value::String(response)
436 };
437 return json!({
438 "SERVICE_NAME": service_name,
439 "SERVICE_VERS": service_vers,
440 "MESSAGE_TYPE": "RESPONSE",
441 "MESSAGE_EXEC": if status == 0 { "SUCCESS" } else { "FAILURE" },
442 "MESSAGE_UUID": uuid,
443 "MESSAGE_DATA": message_data,
444 });
445 } else {
446 error!("Unknown service: {}", target);
447 return json!({
448 "SERVICE_NAME": service_name,
449 "SERVICE_VERS": service_vers,
450 "MESSAGE_TYPE": "RESPONSE",
451 "MESSAGE_EXEC": "FAILURE",
452 "MESSAGE_UUID": uuid,
453 "MESSAGE_DATA": format!("Unknown service: {}", target),
454 });
455 }
456 }
457
458 if request.get("SERVICE_NAME").is_some() {
459 error!("Missing TARGET in request.");
460 }
461
462
463 json!({
464 "SERVICE_NAME": service_name,
465 "SERVICE_VERS": service_vers,
466 "MESSAGE_TYPE": "RESPONSE",
467 "MESSAGE_EXEC": "FAILURE",
468 "MESSAGE_UUID": uuid,
469 "MESSAGE_DATA": "Missing TARGET in request.",
470 })
471}
472
473pub async fn websocket_handler<S>(
493 stream: S,
494 peer: SocketAddr,
495 services: Arc<HashMap<String, Arc<dyn Service>>>,
496 service_name: &str,
497 service_vers: &str,
498) -> Result<(), Box<dyn std::error::Error>>
499where
500 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
501{
502 let boxed = crate::prelude::BoxedStream(Box::new(stream));
503
504 let pna_callback = |req: &Request, mut response: Response| -> std::result::Result<Response, ErrorResponse> {
507 fn bad_request(reason: &str) -> ErrorResponse {
511 let mut resp = ErrorResponse::new(Some(reason.to_string()));
512 *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::BAD_REQUEST;
513 resp
514 }
515
516 let pna_true: tokio_tungstenite::tungstenite::http::HeaderValue =
520 "true".parse().expect("Access-Control-Allow-Private-Network value parses");
521 let vary_origin: tokio_tungstenite::tungstenite::http::HeaderValue =
522 "Origin".parse().expect("Vary header value parses");
523
524 let origin = req.headers().get("Origin").and_then(|v| v.to_str().ok());
525 match origin {
526 None => {
527 let h = response.headers_mut();
529 h.insert("Access-Control-Allow-Private-Network", pna_true);
530 Ok(response)
531 }
532 Some(o) if is_origin_allowed(o) => {
533 let origin_value = match o.parse::<tokio_tungstenite::tungstenite::http::HeaderValue>() {
537 Ok(v) => v,
538 Err(_) => {
539 error!("Rejected WS handshake: Origin header is not a valid HeaderValue: {}", o);
540 return Err(bad_request("Invalid Origin header"));
541 }
542 };
543 let h = response.headers_mut();
544 h.insert("Access-Control-Allow-Private-Network", pna_true);
545 h.insert("Access-Control-Allow-Origin", origin_value);
546 h.insert("Vary", vary_origin);
547 Ok(response)
548 }
549 Some(o) => {
550 error!("Rejected WS handshake from disallowed origin: {}", o);
551 let mut resp = ErrorResponse::new(Some("Forbidden origin".to_string()));
552 *resp.status_mut() = tokio_tungstenite::tungstenite::http::StatusCode::FORBIDDEN;
553 Err(resp)
554 }
555 }
556 };
557
558 let ws_stream = match accept_hdr_async(boxed, pna_callback).await {
559 Ok(ws) => {
560 debug!("WebSocket handshake OK (peer={})", peer);
561 ws
562 }
563 Err(e) => {
564 error!(
565 "WebSocket handshake failed (peer={}): {} (need RFC6455 WebSocket upgrade, e.g. browser WebSocket API)",
566 peer, e
567 );
568 return Err(e.into());
569 }
570 };
571 let (write, mut read) = ws_stream.split();
572
573 let rw_write = Arc::new(tokio::sync::RwLock::new(write));
575
576 while let Some(msg) = read.next().await {
577 match msg {
578 Ok(Message::Text(text)) => {
579 let parsed = serde_json::from_str::<serde_json::Value>(&text);
580
581 let is_ping = parsed
584 .as_ref()
585 .ok()
586 .and_then(|req| req.get("ACTION"))
587 .and_then(Value::as_str)
588 == Some("PING");
589 let truncated = if text.len() > 200 {
590 format!("{}...", &text[..200])
591 } else {
592 text.clone()
593 };
594 if is_ping {
595 debug!("Received text message HANDLER (peer={}): {}", peer, truncated);
596 } else {
597 info!("Received text message HANDLER (peer={}): {}", peer, truncated);
598 }
599
600 match parsed {
601 Ok(request) => {
602 let response_message = process_request(
603 request,
604 Arc::clone(&services),
605 service_name,
606 service_vers,
607 Some(Arc::clone(&rw_write)),
608 )
609 .await;
610
611 let mut ws = rw_write.write().await;
612 if let Err(e) = ws.send(Message::Text(response_message.to_string())).await {
613 error!("Failed to send response: {}", e);
614 }
615 }
616 Err(e) => {
617 let error_response = json!({
619 "SERVICE_NAME": service_name,
620 "SERVICE_VERS": service_vers,
621 "MESSAGE_TYPE": "RESPONSE",
622 "MESSAGE_EXEC": "FAILURE",
623 "MESSAGE_DATA": format!("Invalid JSON: {}", e),
624 });
625 let mut ws = rw_write.write().await;
626 if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
627 error!("Failed to send error response: {}", e);
628 }
629 error!("Invalid JSON: {}", e);
630 }
631 }
632 }
633 Err(e) => {
634 let error_response = json!({
635 "SERVICE_NAME": service_name,
636 "SERVICE_VERS": service_vers,
637 "MESSAGE_TYPE": "RESPONSE",
638 "MESSAGE_EXEC": "FAILURE",
639 "MESSAGE_DATA": format!("Invalid JSON: {}", e),
640 });
641 let mut ws = rw_write.write().await;
642 if let Err(e) = ws.send(Message::Text(error_response.to_string())).await {
643 error!("Failed to send error response: {}", e);
644 }
645 debug!("Error processing message: {}", e)
646 },
647 _ => {}
648 }
649 }
650
651 Ok(())
652}
653
654pub async fn start_remote_server(
659 remote_host: &str,
660 remote_port: u16,
661 service_name: &str,
662 service_vers: &str,
663 services: Arc<HashMap<String, Arc<dyn Service>>>,
664) -> Result<(), std::io::Error> {
665 loop {
666 match connect_to_remote_server(
667 remote_host,
668 remote_port,
669 service_name,
670 service_vers,
671 Arc::clone(&services),
672 false,
673 )
674 .await
675 {
676 Ok(_) => info!("Successfully connected to remote server."),
677 Err(e) => {
678 error!("Failed to connect to remote server: {:?}", e);
679 info!("Reconnecting in 5 seconds...");
680 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
681 }
682 }
683 }
684}
685
686async fn connect_to_remote_server(
691 remote_host: &str,
692 remote_port: u16,
693 service_name: &str,
694 service_vers: &str,
695 services: Arc<HashMap<String, Arc<dyn Service>>>,
696 return_after_connect: bool,
697) -> Result<(), Box<dyn std::error::Error>> {
698 let protocol = if std::env::var("REMOTE_USSL").unwrap_or_else(|_| "true".to_string()) == "true" {
699 "wss"
700 } else {
701 "ws"
702 };
703
704 let url = format!("{}://{}:{}", protocol, remote_host, remote_port);
705 info!("Connecting to remote WebSocket server at: {}", url);
706
707 let ws_url = Url::parse(&url)?;
708 let (ws_stream, _) = connect_async(ws_url.as_str()).await?;
709 let (mut write, mut read) = ws_stream.split();
710
711 let connect_message = json!({
712 "SERVICE_NAME": service_name,
713 "SERVICE_VERS": sharing::VERSION,
714 "MESSAGE_TYPE": "CONNECT",
715 "MODULE_LIST": services.iter().map(|(name, service)| {
716 json!({
717 "MODULE_NAME": name,
718 "MODULE_VERS": service.get_version(),
719 })
720 }).collect::<Vec<_>>(),
721 });
722
723 write.send(Message::Text(connect_message.to_string())).await?;
724 info!("CONNECT message sent.");
725
726 if return_after_connect {
727 return Ok(());
728 }
729
730 let write_p = Arc::new(tokio::sync::Mutex::new(write));
731 let service_name = service_name.to_owned();
732 let service_vers = service_vers.to_owned();
733
734 let cancel_token = Arc::new(tokio::sync::Notify::new());
740
741 let keep_alive_write = Arc::clone(&write_p);
743 let keep_alive_cancel = Arc::clone(&cancel_token);
744 let keep_alive_task = tokio::spawn(async move {
745 let mut interval = tokio::time::interval(std::time::Duration::from_secs(25));
746
747 loop {
748 interval.tick().await;
749
750 let mut ws = keep_alive_write.lock().await;
751 if let Err(e) = ws.send(Message::Ping(vec![])).await {
752 error!("Keep-alive failed: {}", e);
753 keep_alive_cancel.notify_one();
757 break;
758 }
759 }
760 });
761
762 const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(45);
766
767 let read_cancel = Arc::clone(&cancel_token);
768 let result = async {
769 loop {
770 tokio::select! {
771 biased;
772 _ = read_cancel.notified() => {
773 error!("Keep-alive task signalled connection loss, breaking read loop");
774 break;
775 }
776 next = tokio::time::timeout(READ_TIMEOUT, read.next()) => {
777 match next {
778 Ok(Some(Ok(Message::Text(text)))) => {
779 info!("Received message");
780 if let Ok(request) = serde_json::from_str::<serde_json::Value>(&text) {
781 let response = process_request(
782 request,
783 Arc::clone(&services),
784 &service_name,
785 &service_vers,
786 None,
787 ).await;
788
789 let mut ws = write_p.lock().await;
790 ws.send(Message::Text(response.to_string())).await?;
791 }
792 }
793 Ok(Some(Ok(Message::Ping(ping)))) => {
794 debug!("Received PING from server, sending PONG");
795 let mut ws = write_p.lock().await;
796 ws.send(Message::Pong(ping)).await?;
797 }
798 Ok(Some(Ok(Message::Pong(_)))) => {
799 }
801 Ok(Some(Ok(Message::Close(_)))) => {
802 error!("Connection closed by server");
803 break;
804 }
805 Ok(Some(Err(e))) => {
806 error!("WebSocket error: {}", e);
807 break;
808 }
809 Ok(None) => {
810 error!("Connection closed");
811 break;
812 }
813 Err(_) => {
814 error!(
815 "No activity in {} seconds, reconnecting",
816 READ_TIMEOUT.as_secs()
817 );
818 break;
819 }
820 Ok(Some(Ok(_))) => {}
824 }
825 }
826 }
827 }
828 Ok::<(), Box<dyn std::error::Error>>(())
829 }.await;
830
831 keep_alive_task.abort();
832 let _ = keep_alive_task.await;
833 result
834}
835
836pub async fn send_message(write: &WebSocketWrite, message_data: String) {
842 if let Some(ws_lock) = &write {
843 let mut ws = ws_lock.write().await;
844
845 match ws.send(Message::Text(message_data.clone())).await {
846 Ok(_) => {
847 info!("Message sent successfully: {}", message_data);
848 }
849 Err(e) => {
850 error!("Failed to send message: {:?}", e);
851 }
852 }
853 } else {
854 info!("No WebSocket available to send message.");
855 }
856}
857