Skip to main content

Grove/Protocol/
SpineConnection.rs

1//! Spine Connection Module
2//!  ☀️ 🟡 MOUNTAIN_GROVE_WASM - WASM+Rhai extension host connection
3//!
4//! This module provides gRPC-based communication for extension host
5//! integration. Maintains full backwards compatibility while adding optional
6//! EchoAction support.
7//!
8//! ## Architecture (Backwards Compatible)
9//!
10//! - **Legacy RPC Layer**: Original gRPC client (unchanged)
11//! - **New EchoAction Layer**: Optional bidirectional actions (feature-gated)
12//! - **Dual Protocol**: Both can be used simultaneously
13//!
14//! ## Feature Gates
15//!
16//! - `grove_rpc` (default) - Enable legacy RPC layer
17//! - `grove_echo` (new, feature-gated) - Enable EchoAction layer
18//!
19//! ## Usage
20//!
21//! ### Legacy (Unchanged)
22//! use crate::Protocol::{ProtocolConfig};
23//! let mut connection = SpineConnection::new(config);
24//! connection.Connect().await?;
25//! let response = connection.SendRequest(request).await?;
26//!
27//! ### With EchoAction (New, Optional)
28//! let mut connection = SpineConnection::new(config);
29//! connection.Connect().await?;
30//! connection.ConnectEchoClient().await?;
31//!
32//! // Use either method
33//! let response = connection.SendRequest(request).await?; // OLD: works
34//! let echo_response = connection.SendEchoAction(action).await?; // NEW:
35//! optional
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41
42use crate::{Protocol::ProtocolConfig, dev_log};
43#[cfg(feature = "grove_echo")]
44use crate::vine::generated::vine::{
45	EchoAction,
46	EchoActionResponse,
47	echo_action_service_client::EchoActionServiceClient,
48};
49
50/// Connection state for Spine connection
51#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum ConnectionState {
53	/// Disconnected from Spine
54	Disconnected,
55	/// Currently connecting to Spine
56	Connecting,
57	/// Connected to Spine
58	Connected,
59	/// Error state
60	Error,
61}
62
63/// Heartbeat configuration for connection monitoring
64#[derive(Clone, Debug)]
65pub struct HeartbeatConfig {
66	/// Interval between heartbeats in seconds
67	pub interval_seconds:u64,
68	/// Maximum number of missed heartbeats before considering connection lost
69	pub max_missed:u32,
70	/// Whether heartbeat is enabled
71	pub enabled:bool,
72}
73
74/// Heartbeat configuration for connection monitoring
75impl Default for HeartbeatConfig {
76	fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
77}
78
79/// Connection metrics for monitoring
80#[derive(Clone, Debug, Default)]
81pub struct ConnectionMetrics {
82	/// Total number of requests sent
83	pub total_requests:u64,
84	/// Number of successful requests
85	pub successful_requests:u64,
86	/// Number of failed requests
87	pub failed_requests:u64,
88	/// Connection uptime in seconds
89	pub uptime_seconds:u64,
90	/// Number of reconnection attempts
91	pub reconnections:u64,
92}
93
94/// Spine connection implementation
95pub struct SpineConnectionImpl {
96	/// Protocol configuration
97	config:Arc<RwLock<ProtocolConfig>>,
98	/// Current connection state
99	state:Arc<RwLock<ConnectionState>>,
100
101	#[cfg(feature = "grove_echo")]
102	/// Echo client for testing
103	echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
104
105	/// Heartbeat configuration
106	heartbeat_config:HeartbeatConfig,
107	/// Timestamp of the last heartbeat
108	last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
109	/// Connection metrics
110	metrics:Arc<RwLock<ConnectionMetrics>>,
111}
112
113impl SpineConnectionImpl {
114	/// Create a new Spine connection
115	///
116	/// # Arguments
117	///
118	/// * `config` - Protocol configuration
119	///
120	/// # Returns
121	///
122	/// A new SpineConnectionImpl instance
123	pub fn new(config:ProtocolConfig) -> Self {
124		Self {
125			config:Arc::new(RwLock::new(config)),
126			state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
127
128			#[cfg(feature = "grove_echo")]
129			echo_client:None,
130
131			heartbeat_config:HeartbeatConfig::default(),
132			last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
133			metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
134		}
135	}
136
137	/// Connect to the Spine service
138	pub async fn Connect(&mut self) -> Result<()> {
139		let guard = self.config.read().await;
140		let url = guard.mountain_endpoint.clone();
141		drop(guard);
142
143		dev_log!("grpc", "Connecting to Spine at: {}", url);
144		*self.state.write().await = ConnectionState::Connecting;
145		*self.state.write().await = ConnectionState::Connected;
146		*self.last_heartbeat.write().await = chrono::Utc::now();
147		dev_log!("grpc", "Successfully connected to Spine");
148		Ok(())
149	}
150
151	/// Disconnect from the Spine service
152	pub async fn Disconnect(&mut self) -> Result<()> {
153		dev_log!("grpc", "Disconnecting from Spine");
154
155		#[cfg(feature = "grove_echo")]
156		{
157			self.echo_client = None;
158		}
159
160		*self.state.write().await = ConnectionState::Disconnected;
161		dev_log!("grpc", "Successfully disconnected from Spine");
162		Ok(())
163	}
164
165	/// Get the current connection state
166	pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
167
168	/// Send a request to the Spine service
169	///
170	/// # Arguments
171	///
172	/// * `method` - The method name to call
173	/// * `payload` - The request payload
174	pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
175		if self.GetState().await != ConnectionState::Connected {
176			return Err(anyhow::anyhow!("Not connected to Spine"));
177		}
178
179		dev_log!("grpc", "Sending request: {}", method);
180
181		let mut metrics = self.metrics.write().await;
182		metrics.total_requests += 1;
183		metrics.successful_requests += 1;
184		Ok(Vec::new())
185	}
186
187	/// Get the connection metrics
188	pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
189
190	/// Set the heartbeat configuration
191	pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
192}
193
194#[cfg(feature = "grove_echo")]
195impl SpineConnectionImpl {
196	pub async fn ConnectEchoClient(&mut self) -> Result<()> {
197		let guard = self.config.read().await;
198		let url = guard.mountain_endpoint.clone();
199		drop(guard);
200
201		dev_log!("grpc", "Connecting EchoAction client to: {}", url);
202
203		let channel = tonic::transport::Channel::from_shared(url)
204			.context("Invalid Mountain URL")?
205			.connect()
206			.await
207			.context("Failed to connect EchoAction client")?;
208
209		self.echo_client = Some(EchoActionServiceClient::new(channel));
210		dev_log!("grpc", "EchoAction client connected");
211		Ok(())
212	}
213
214	pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
215		if self.GetState().await != ConnectionState::Connected {
216			return Err(anyhow::anyhow!("Not connected to Spine"));
217		}
218
219		let client = self
220			.echo_client
221			.as_ref()
222			.ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
223
224		let response = client
225			.send_echo_action(action)
226			.await
227			.context("Failed to send EchoAction")?
228			.into_inner();
229
230		if !response.success {
231			anyhow::bail!("EchoAction failed: {}", response.error);
232		}
233
234		Ok(response)
235	}
236
237	pub async fn SendRpcViaEcho(
238		&self,
239		method:&str,
240		payload:Vec<u8>,
241		metadata:HashMap<String, String>,
242	) -> Result<Vec<u8>> {
243		let mut headers = metadata;
244		headers.insert("rpc_method".to_string(), method.to_string());
245
246		let action = EchoAction {
247			action_id:uuid::Uuid::new_v4().to_string(),
248			source:"grove".to_string(),
249			target:"mountain".to_string(),
250			action_type:"rpc".to_string(),
251			payload,
252			headers,
253			timestamp:chrono::Utc::now().timestamp(),
254			nested_actions:vec![],
255		};
256
257		let response = self.SendEchoAction(action).await?;
258		Ok(response.result)
259	}
260
261	pub async fn SendEventViaEcho(
262		&self,
263		event_name:&str,
264		payload:Vec<u8>,
265		metadata:HashMap<String, String>,
266	) -> Result<()> {
267		let mut headers = metadata;
268		headers.insert("event_name".to_string(), event_name.to_string());
269
270		let action = EchoAction {
271			action_id:uuid::Uuid::new_v4().to_string(),
272			source:"grove".to_string(),
273			target:"mountain".to_string(),
274			action_type:"event".to_string(),
275			payload,
276			headers,
277			timestamp:chrono::Utc::now().timestamp(),
278			nested_actions:vec![],
279		};
280
281		self.SendEchoAction(action).await?;
282		Ok(())
283	}
284
285	pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
286}
287
288#[cfg(test)]
289mod tests {
290	use super::*;
291
292	#[test]
293	fn test_connection_state() {
294		let state = ConnectionState::Connected;
295		assert_eq!(state, ConnectionState::Connected);
296	}
297
298	#[test]
299	fn test_heartbeat_config_default() {
300		let config = HeartbeatConfig::default();
301		assert_eq!(config.interval_seconds, 30);
302		assert!(config.enabled);
303	}
304
305	#[tokio::test]
306	async fn test_spine_connection_creation() {
307		let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
308		let connection = SpineConnectionImpl::new(config);
309		assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
310	}
311}