1use crate::service::{Service, WebSocketWrite};
2use async_trait::async_trait;
3use futures_util::SinkExt;
4use serde_json::{json, Value};
5use std::any::Any;
6use std::sync::Arc;
7use tokio_tungstenite::tungstenite::protocol::Message;
8use easy_trace::prelude::{error, info, warn};
9
10const PAYTEF_VERSION: &str = "1.0.0";
11const POLL_INTERVAL_MS: u64 = 1000;
12
13const TRANSACTION_TIMEOUT_SECS: u64 = 300;
20
21const CACHE_KEY: &str = "paytef_sales";
27
28const CACHE_TTL_SECS: u64 = 24 * 3600;
33
34pub struct PaytefService {
35 persistence: Option<Arc<crate::persistence::PersistenceService>>,
44}
45
46impl PaytefService {
47 pub fn new(persistence: Option<Arc<crate::persistence::PersistenceService>>) -> Self {
48 info!("PaytefService initialized");
49 Self { persistence }
50 }
51
52 fn now_secs() -> u64 {
53 std::time::SystemTime::now()
54 .duration_since(std::time::UNIX_EPOCH)
55 .map(|d| d.as_secs())
56 .unwrap_or(0)
57 }
58
59 fn prune_expired(map: &mut serde_json::Map<String, Value>, now: u64, ttl: u64) -> usize {
65 let before = map.len();
66 map.retain(|_, v| {
67 let created = v.get("created_at").and_then(|c| c.as_u64()).unwrap_or(0);
68 created > 0 && now.saturating_sub(created) < ttl
69 });
70 before - map.len()
71 }
72
73 async fn load_sales(&self) -> serde_json::Map<String, Value> {
79 let persistence = match &self.persistence {
80 Some(p) => p,
81 None => return serde_json::Map::new(),
82 };
83 let cache = persistence.cache.lock().await;
84 cache.get(CACHE_KEY)
85 .and_then(|v| v.as_object())
86 .cloned()
87 .unwrap_or_default()
88 }
89
90 async fn save_sales(&self, map: serde_json::Map<String, Value>) {
91 let persistence = match &self.persistence {
92 Some(p) => p,
93 None => return,
94 };
95 if let Err(e) = persistence.set_value(CACHE_KEY.to_string(), Value::Object(map)).await {
96 warn!("Failed to persist paytef sales cache: {}", e);
97 }
98 }
99
100 fn base_url(host: &str, port: u16) -> String {
101 format!("http://{}:{}", host, port)
102 }
103
104 async fn send_ws_message(write: &WebSocketWrite, message: Value) {
105 if let Some(ws_lock) = write {
106 let serialized = Message::Text(message.to_string());
107 let mut ws = ws_lock.write().await;
108 if let Err(e) = ws.send(serialized).await {
109 error!("Failed to send WebSocket message: {}", e);
110 }
111 }
112 }
113
114 async fn http_post(url: &str, body: &Value) -> Result<Value, String> {
115 let client = reqwest::Client::builder()
121 .http1_only()
122 .timeout(std::time::Duration::from_secs(10))
123 .build()
124 .map_err(|e| format!("HTTP client build failed: {}", e))?;
125
126 let response = client
127 .post(url)
128 .json(body)
129 .send()
130 .await
131 .map_err(|e| {
132 let mut cause = format!("{}", e);
135 let mut src: Option<&(dyn std::error::Error + 'static)> =
136 std::error::Error::source(&e);
137 while let Some(s) = src {
138 cause.push_str(&format!(" | caused by: {}", s));
139 src = s.source();
140 }
141 format!("HTTP request failed: {}", cause)
142 })?;
143
144 let status = response.status();
145 let text = response
146 .text()
147 .await
148 .unwrap_or_else(|_| "{}".to_string());
149
150 if !status.is_success() {
151 return Err(format!("HTTP {}: {}", status, text));
152 }
153
154 serde_json::from_str(&text)
155 .map_err(|e| format!("JSON parse error: {} - body: {}", e, text))
156 }
157
158 async fn transaction(
164 op_type: &str,
165 host: &str,
166 port: u16,
167 amount: i64,
168 pinpad: &str,
169 reference: &str,
170 previous_operation_number: Option<i64>,
171 write: &WebSocketWrite,
172 ) -> Result<Value, String> {
173 let base = Self::base_url(host, port);
174
175 info!(op_type = op_type, host = host, port = port, amount = amount, "Starting Paytef transaction");
177 let mut start_body = json!({
178 "language": "es",
179 "pinpad": pinpad,
180 "executeOptions": { "method": "polling" },
181 "opType": op_type,
182 "requestedAmount": amount,
183 "createReceipt": false,
184 "showResultSeconds": 5,
185 "transactionReference": reference
186 });
187
188 if let Some(prev) = previous_operation_number {
189 start_body["operationDetails"] = json!({
190 "previousOperationNumber": prev
191 });
192 }
193
194 let start_resp = Self::http_post(
195 &format!("{}/transaction/start", base),
196 &start_body,
197 ).await?;
198
199 let started = start_resp
200 .get("info")
201 .and_then(|i| i.get("started"))
202 .and_then(|s| s.as_bool())
203 .unwrap_or(false);
204
205 if !started {
206 let msg = start_resp
207 .get("info")
208 .and_then(|i| i.get("message"))
209 .and_then(|m| m.as_str())
210 .unwrap_or("Unknown error");
211 return Err(format!("Failed to start transaction: {}", msg));
212 }
213
214 let session_id = start_resp
215 .get("info")
216 .and_then(|i| i.get("sessionID"))
217 .and_then(|s| s.as_str())
218 .unwrap_or("")
219 .to_string();
220
221 info!(session_id = %session_id, "Paytef transaction started");
222
223 Self::send_ws_message(write, json!({
225 "status": "started",
226 "opType": op_type,
227 "sessionID": session_id
228 })).await;
229
230 let poll_body = json!({ "pinpad": pinpad });
239 let deadline = tokio::time::Instant::now()
240 + std::time::Duration::from_secs(TRANSACTION_TIMEOUT_SECS);
241 loop {
242 if tokio::time::Instant::now() >= deadline {
243 warn!(
244 op_type = op_type,
245 session_id = %session_id,
246 timeout_secs = TRANSACTION_TIMEOUT_SECS,
247 "Paytef transaction poll timed out, cancelling"
248 );
249 Self::send_ws_message(write, json!({
250 "status": "progress",
251 "opType": op_type,
252 "transactionStatus": "timeout",
253 "sessionID": session_id
254 })).await;
255 if let Err(e) = Self::cancel(host, port, pinpad).await {
258 warn!(error = %e, "Cancel after timeout failed");
259 }
260 return Err(format!(
261 "Transaction timed out after {}s without reaching a terminal status",
262 TRANSACTION_TIMEOUT_SECS
263 ));
264 }
265
266 tokio::time::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MS)).await;
267
268 let poll_resp = Self::http_post(
269 &format!("{}/transaction/poll", base),
270 &poll_body,
271 ).await?;
272
273 let poll_info = poll_resp.get("info").cloned().unwrap_or(json!({}));
274 let tx_status = poll_info.get("transactionStatus")
275 .and_then(|s| s.as_str())
276 .unwrap_or("");
277 let card_status = poll_info.get("cardStatus")
278 .and_then(|s| s.as_str())
279 .unwrap_or("");
280
281 Self::send_ws_message(write, json!({
283 "status": "progress",
284 "opType": op_type,
285 "transactionStatus": tx_status,
286 "cardStatus": card_status,
287 "sessionID": session_id
288 })).await;
289
290 if let Some(result) = poll_resp.get("result") {
292 if !result.is_null() {
293 break;
294 }
295 }
296
297 if tx_status == "finished" || tx_status == "cancelled" || tx_status == "error" {
299 break;
300 }
301 }
302
303 let result_resp = Self::http_post(
305 &format!("{}/transaction/result", base),
306 &poll_body,
307 ).await?;
308
309 let result = result_resp.get("result").cloned().unwrap_or(Value::Null);
310
311 if result.is_null() {
312 return Err("Transaction cancelled or no result".to_string());
313 }
314
315 let approved = result.get("approved")
316 .and_then(|a| a.as_bool())
317 .unwrap_or(false);
318
319 let response = json!({
320 "status": if approved { "approved" } else { "denied" },
321 "opType": op_type,
322 "approved": approved,
323 "resultCode": result.get("resultCode"),
324 "resultText": result.get("resultText"),
325 "authorisationCode": result.get("authorisationCode"),
326 "cardInformation": result.get("cardInformation"),
327 "requestedAmount": result.get("requestedAmount"),
328 "paytefOperationNumber": result.get("paytefOperationNumber"),
329 "sessionID": session_id
330 });
331
332 info!(
333 op_type = op_type,
334 approved = approved,
335 session_id = %session_id,
336 paytef_op = ?result.get("paytefOperationNumber"),
337 "Paytef transaction completed"
338 );
339
340 Ok(response)
341 }
342
343 async fn cancel(host: &str, port: u16, pinpad: &str) -> Result<Value, String> {
344 let base = Self::base_url(host, port);
345 let body = json!({ "pinpad": pinpad });
346 let resp = Self::http_post(&format!("{}/pinpad/cancel", base), &body).await?;
347 Ok(resp)
348 }
349
350 async fn status(host: &str, port: u16) -> Result<Value, String> {
351 let base = Self::base_url(host, port);
352 let body = json!({ "pinpad": "*" });
353 let resp = Self::http_post(&format!("{}/pinpad/status", base), &body).await?;
354 Ok(resp)
355 }
356
357 async fn print(host: &str, port: u16, content: &str, pinpad: &str) -> Result<Value, String> {
358 let base = Self::base_url(host, port);
359 let body = json!({
360 "pinpad": pinpad,
361 "content": content,
362 "contentType": "html",
363 "usePrinter": true,
364 "generateImage": false
365 });
366 let resp = Self::http_post(&format!("{}/printer/print", base), &body).await?;
367 Ok(resp)
368 }
369
370 async fn lookup_sale(&self, reference: &str) -> Option<Value> {
375 if reference.is_empty() {
376 return None;
377 }
378 let map = self.load_sales().await;
379 let entry = map.get(reference)?.clone();
380 let created = entry.get("created_at").and_then(|c| c.as_u64()).unwrap_or(0);
381 if created == 0 || Self::now_secs().saturating_sub(created) >= CACHE_TTL_SECS {
382 return None;
383 }
384 Some(entry)
385 }
386
387 async fn remember_sale(
391 &self,
392 reference: &str,
393 paytef_operation_number: i64,
394 amount: i64,
395 host: &str,
396 port: u16,
397 pinpad: &str,
398 ) {
399 if reference.is_empty() {
400 return;
401 }
402 if self.persistence.is_none() {
403 warn!("Paytef sale not cached — no persistence service available");
404 return;
405 }
406 let now = Self::now_secs();
407 let mut map = self.load_sales().await;
408 map.insert(reference.to_string(), json!({
409 "paytef_operation_number": paytef_operation_number,
410 "amount": amount,
411 "host": host,
412 "port": port,
413 "pinpad": pinpad,
414 "created_at": now,
415 }));
416 let pruned = Self::prune_expired(&mut map, now, CACHE_TTL_SECS);
417 if pruned > 0 {
418 info!("Paytef cache: pruned {} expired sale(s) on write", pruned);
419 }
420 self.save_sales(map).await;
421 }
422
423 async fn forget_sale(&self, reference: &str) {
424 if reference.is_empty() {
425 return;
426 }
427 if self.persistence.is_none() {
428 return;
429 }
430 let mut map = self.load_sales().await;
431 if map.remove(reference).is_some() {
432 self.save_sales(map).await;
433 }
434 }
435}
436
437#[async_trait]
438impl Service for PaytefService {
439 async fn run(&self, action: Value, write: WebSocketWrite) -> (i32, String) {
440 let owned: Value = match action.get("ACTION") {
446 Some(v) if v.is_object() => v.clone(),
447 Some(v) if v.is_string() => match serde_json::from_str::<Value>(v.as_str().unwrap_or("")) {
448 Ok(parsed) if parsed.is_object() => parsed,
449 _ => {
450 error!("Invalid action format: ACTION string is not a JSON object");
451 return (1, "Invalid action format: ACTION string is not a JSON object".to_string());
452 }
453 },
454 _ => {
455 error!("Invalid action format: missing 'ACTION'");
456 return (1, "Invalid action format: missing 'ACTION'".to_string());
457 }
458 };
459 let action_data = owned.as_object().expect("owned guaranteed to be object");
460
461 let command = action_data
462 .get("command")
463 .and_then(|c| c.as_str())
464 .unwrap_or("");
465
466 let host = action_data
467 .get("host")
468 .and_then(|h| h.as_str())
469 .unwrap_or("127.0.0.1");
470
471 let port = action_data
472 .get("port")
473 .and_then(|p| p.as_u64())
474 .unwrap_or(8887) as u16;
475
476 let pinpad = action_data
477 .get("pinpad")
478 .and_then(|p| p.as_str())
479 .unwrap_or("*");
480
481 match command {
482 "SALE" => {
483 let amount = action_data
484 .get("amount")
485 .and_then(|a| a.as_i64())
486 .unwrap_or(0);
487
488 let reference = action_data
489 .get("reference")
490 .and_then(|r| r.as_str())
491 .unwrap_or("");
492
493 if amount <= 0 {
494 return (1, "Invalid amount: must be > 0".to_string());
495 }
496
497 match Self::transaction("sale", host, port, amount, pinpad, reference, None, &write).await {
498 Ok(result) => {
499 let approved = result.get("approved").and_then(|a| a.as_bool()).unwrap_or(false);
502 let op_number = result.get("paytefOperationNumber").and_then(|n| n.as_i64());
503 if approved {
504 if let Some(op) = op_number {
505 self.remember_sale(reference, op, amount, host, port, pinpad).await;
506 }
507 else {
508 warn!(reference = reference, "Paytef sale approved but no paytefOperationNumber in result — skipping cache (referenced refund won't be possible)");
509 }
510 }
511 (0, result.to_string())
512 }
513 Err(e) => {
514 error!(error = %e, "Paytef SALE failed");
515 (1, json!({ "status": "error", "error": e }).to_string())
516 }
517 }
518 }
519
520 "REFUND" => {
521 let reference = action_data
522 .get("reference")
523 .and_then(|r| r.as_str())
524 .unwrap_or("");
525
526 let explicit_amount = action_data
529 .get("amount")
530 .and_then(|a| a.as_i64());
531
532 let cached = self.lookup_sale(reference).await;
533
534 let cached_amount = cached.as_ref().and_then(|v| v.get("amount")).and_then(|a| a.as_i64());
536 let cached_op = cached.as_ref().and_then(|v| v.get("paytef_operation_number")).and_then(|a| a.as_i64());
537 let cached_host = cached.as_ref().and_then(|v| v.get("host")).and_then(|a| a.as_str()).map(|s| s.to_string());
538 let cached_port = cached.as_ref().and_then(|v| v.get("port")).and_then(|a| a.as_u64()).map(|p| p as u16);
539 let cached_pinpad = cached.as_ref().and_then(|v| v.get("pinpad")).and_then(|a| a.as_str()).map(|s| s.to_string());
540
541 let (amount, prev_op, eff_host, eff_port, eff_pinpad) = match (cached.is_some(), explicit_amount) {
542 (true, None) => (
544 cached_amount.unwrap_or(0),
545 cached_op,
546 cached_host.unwrap_or_else(|| host.to_string()),
547 cached_port.unwrap_or(port),
548 cached_pinpad.unwrap_or_else(|| pinpad.to_string()),
549 ),
550 (true, Some(amt)) => (
552 amt,
553 cached_op,
554 cached_host.unwrap_or_else(|| host.to_string()),
555 cached_port.unwrap_or(port),
556 cached_pinpad.unwrap_or_else(|| pinpad.to_string()),
557 ),
558 (false, Some(amt)) => (
561 amt,
562 None,
563 host.to_string(),
564 port,
565 pinpad.to_string(),
566 ),
567 (false, None) => {
569 return (1, "Missing amount and no cached sale for reference".to_string());
570 }
571 };
572
573 if amount <= 0 {
574 return (1, "Invalid amount: must be > 0".to_string());
575 }
576
577 match Self::transaction("refund", &eff_host, eff_port, amount, &eff_pinpad, reference, prev_op, &write).await {
578 Ok(result) => {
579 let approved = result.get("approved").and_then(|a| a.as_bool()).unwrap_or(false);
580 if approved && cached.is_some() {
581 let full = cached_amount == Some(amount);
585 if full {
586 self.forget_sale(reference).await;
587 }
588 }
589 (0, result.to_string())
590 }
591 Err(e) => {
592 error!(error = %e, "Paytef REFUND failed");
593 (1, json!({ "status": "error", "error": e }).to_string())
594 }
595 }
596 }
597
598 "CANCEL" => {
599 match Self::cancel(host, port, pinpad).await {
600 Ok(result) => (0, result.to_string()),
601 Err(e) => {
602 error!(error = %e, "Paytef CANCEL failed");
603 (1, json!({ "status": "error", "error": e }).to_string())
604 }
605 }
606 }
607
608 "STATUS" => {
609 match Self::status(host, port).await {
610 Ok(result) => (0, result.to_string()),
611 Err(e) => {
612 error!(error = %e, "Paytef STATUS failed");
613 (1, json!({ "status": "error", "error": e }).to_string())
614 }
615 }
616 }
617
618 "PRINT" => {
619 let content = action_data
620 .get("content")
621 .and_then(|c| c.as_str())
622 .unwrap_or("");
623
624 if content.is_empty() {
625 return (1, "Missing print content".to_string());
626 }
627
628 match Self::print(host, port, content, pinpad).await {
629 Ok(result) => (0, result.to_string()),
630 Err(e) => {
631 error!(error = %e, "Paytef PRINT failed");
632 (1, json!({ "status": "error", "error": e }).to_string())
633 }
634 }
635 }
636
637 _ => {
638 error!(command = command, "Unknown Paytef command");
639 (1, format!("Unknown command: {}", command))
640 }
641 }
642 }
643
644 fn as_any(&self) -> &dyn Any {
645 self
646 }
647
648 fn stop_service(&self) {
649 info!("PaytefService stopped.");
650 }
651
652 fn get_version(&self) -> String {
653 PAYTEF_VERSION.to_string()
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660
661 #[test]
662 fn test_base_url() {
663 assert_eq!(
664 PaytefService::base_url("192.168.1.178", 8887),
665 "http://192.168.1.178:8887"
666 );
667 }
668
669 #[test]
670 fn test_base_url_localhost() {
671 assert_eq!(
672 PaytefService::base_url("127.0.0.1", 8887),
673 "http://127.0.0.1:8887"
674 );
675 }
676
677 #[tokio::test]
678 async fn test_sale_invalid_amount() {
679 let service = PaytefService::new(None);
680 let action = json!({
681 "ACTION": {
682 "command": "SALE",
683 "host": "127.0.0.1",
684 "port": 8887,
685 "amount": 0,
686 "pinpad": "*"
687 }
688 });
689
690 let (status, msg) = service.run(action, None).await;
691 assert_eq!(status, 1);
692 assert!(msg.contains("Invalid amount"));
693 }
694
695 #[tokio::test]
696 async fn test_missing_action() {
697 let service = PaytefService::new(None);
698 let action = json!({ "no_action": true });
699
700 let (status, msg) = service.run(action, None).await;
701 assert_eq!(status, 1);
702 assert!(msg.contains("missing 'ACTION'"));
703 }
704
705 #[tokio::test]
706 async fn test_unknown_command() {
707 let service = PaytefService::new(None);
708 let action = json!({
709 "ACTION": {
710 "command": "UNKNOWN_CMD",
711 "host": "127.0.0.1",
712 "port": 8887
713 }
714 });
715
716 let (status, msg) = service.run(action, None).await;
717 assert_eq!(status, 1);
718 assert!(msg.contains("Unknown command"));
719 }
720
721 #[tokio::test]
722 async fn test_refund_no_reference_no_amount() {
723 let service = PaytefService::new(None);
724 let action = json!({
725 "ACTION": {
726 "command": "REFUND",
727 "host": "127.0.0.1",
728 "port": 8887
729 }
730 });
731
732 let (status, msg) = service.run(action, None).await;
733 assert_eq!(status, 1);
734 assert!(msg.contains("Missing amount"));
735 }
736
737 #[test]
738 fn test_prune_expired_drops_old_entries() {
739 let mut map = serde_json::Map::new();
741 map.insert("fresh".into(), json!({ "created_at": 99000_u64 }));
742 map.insert("expired".into(), json!({ "created_at": 90000_u64 }));
743 map.insert("no_timestamp".into(), json!({}));
744 map.insert("zero".into(), json!({ "created_at": 0_u64 }));
745
746 let dropped = PaytefService::prune_expired(&mut map, 100000, 3600);
747 assert_eq!(dropped, 3);
748 assert!(map.contains_key("fresh"));
749 assert!(!map.contains_key("expired"));
750 assert!(!map.contains_key("no_timestamp"));
751 assert!(!map.contains_key("zero"));
752 }
753
754 #[test]
755 fn test_prune_expired_keeps_everything_within_ttl() {
756 let mut map = serde_json::Map::new();
757 map.insert("a".into(), json!({ "created_at": 1000_u64 }));
758 map.insert("b".into(), json!({ "created_at": 2000_u64 }));
759
760 let dropped = PaytefService::prune_expired(&mut map, 2500, 3600);
761 assert_eq!(dropped, 0);
762 assert_eq!(map.len(), 2);
763 }
764}