Grove/Protocol/
SpineConnection.rs1use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41
42use crate::{Protocol::ProtocolConfig, dev_log};
43#[cfg(feature = "grove_echo")]
44use crate::vine::generated::vine::{
45 EchoAction,
46 EchoActionResponse,
47 echo_action_service_client::EchoActionServiceClient,
48};
49
50#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum ConnectionState {
53 Disconnected,
55 Connecting,
57 Connected,
59 Error,
61}
62
63#[derive(Clone, Debug)]
65pub struct HeartbeatConfig {
66 pub interval_seconds:u64,
68 pub max_missed:u32,
70 pub enabled:bool,
72}
73
74impl Default for HeartbeatConfig {
76 fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
77}
78
79#[derive(Clone, Debug, Default)]
81pub struct ConnectionMetrics {
82 pub total_requests:u64,
84 pub successful_requests:u64,
86 pub failed_requests:u64,
88 pub uptime_seconds:u64,
90 pub reconnections:u64,
92}
93
94pub struct SpineConnectionImpl {
96 config:Arc<RwLock<ProtocolConfig>>,
98 state:Arc<RwLock<ConnectionState>>,
100
101 #[cfg(feature = "grove_echo")]
102 echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
104
105 heartbeat_config:HeartbeatConfig,
107 last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
109 metrics:Arc<RwLock<ConnectionMetrics>>,
111}
112
113impl SpineConnectionImpl {
114 pub fn new(config:ProtocolConfig) -> Self {
124 Self {
125 config:Arc::new(RwLock::new(config)),
126 state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
127
128 #[cfg(feature = "grove_echo")]
129 echo_client:None,
130
131 heartbeat_config:HeartbeatConfig::default(),
132 last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
133 metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
134 }
135 }
136
137 pub async fn Connect(&mut self) -> Result<()> {
139 let guard = self.config.read().await;
140 let url = guard.mountain_endpoint.clone();
141 drop(guard);
142
143 dev_log!("grpc", "Connecting to Spine at: {}", url);
144 *self.state.write().await = ConnectionState::Connecting;
145 *self.state.write().await = ConnectionState::Connected;
146 *self.last_heartbeat.write().await = chrono::Utc::now();
147 dev_log!("grpc", "Successfully connected to Spine");
148 Ok(())
149 }
150
151 pub async fn Disconnect(&mut self) -> Result<()> {
153 dev_log!("grpc", "Disconnecting from Spine");
154
155 #[cfg(feature = "grove_echo")]
156 {
157 self.echo_client = None;
158 }
159
160 *self.state.write().await = ConnectionState::Disconnected;
161 dev_log!("grpc", "Successfully disconnected from Spine");
162 Ok(())
163 }
164
165 pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
167
168 pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
175 if self.GetState().await != ConnectionState::Connected {
176 return Err(anyhow::anyhow!("Not connected to Spine"));
177 }
178
179 dev_log!("grpc", "Sending request: {}", method);
180
181 let mut metrics = self.metrics.write().await;
182 metrics.total_requests += 1;
183 metrics.successful_requests += 1;
184 Ok(Vec::new())
185 }
186
187 pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
189
190 pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
192}
193
194#[cfg(feature = "grove_echo")]
195impl SpineConnectionImpl {
196 pub async fn ConnectEchoClient(&mut self) -> Result<()> {
197 let guard = self.config.read().await;
198 let url = guard.mountain_endpoint.clone();
199 drop(guard);
200
201 dev_log!("grpc", "Connecting EchoAction client to: {}", url);
202
203 let channel = tonic::transport::Channel::from_shared(url)
204 .context("Invalid Mountain URL")?
205 .connect()
206 .await
207 .context("Failed to connect EchoAction client")?;
208
209 self.echo_client = Some(EchoActionServiceClient::new(channel));
210 dev_log!("grpc", "EchoAction client connected");
211 Ok(())
212 }
213
214 pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
215 if self.GetState().await != ConnectionState::Connected {
216 return Err(anyhow::anyhow!("Not connected to Spine"));
217 }
218
219 let client = self
220 .echo_client
221 .as_ref()
222 .ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
223
224 let response = client
225 .send_echo_action(action)
226 .await
227 .context("Failed to send EchoAction")?
228 .into_inner();
229
230 if !response.success {
231 anyhow::bail!("EchoAction failed: {}", response.error);
232 }
233
234 Ok(response)
235 }
236
237 pub async fn SendRpcViaEcho(
238 &self,
239 method:&str,
240 payload:Vec<u8>,
241 metadata:HashMap<String, String>,
242 ) -> Result<Vec<u8>> {
243 let mut headers = metadata;
244 headers.insert("rpc_method".to_string(), method.to_string());
245
246 let action = EchoAction {
247 action_id:uuid::Uuid::new_v4().to_string(),
248 source:"grove".to_string(),
249 target:"mountain".to_string(),
250 action_type:"rpc".to_string(),
251 payload,
252 headers,
253 timestamp:chrono::Utc::now().timestamp(),
254 nested_actions:vec![],
255 };
256
257 let response = self.SendEchoAction(action).await?;
258 Ok(response.result)
259 }
260
261 pub async fn SendEventViaEcho(
262 &self,
263 event_name:&str,
264 payload:Vec<u8>,
265 metadata:HashMap<String, String>,
266 ) -> Result<()> {
267 let mut headers = metadata;
268 headers.insert("event_name".to_string(), event_name.to_string());
269
270 let action = EchoAction {
271 action_id:uuid::Uuid::new_v4().to_string(),
272 source:"grove".to_string(),
273 target:"mountain".to_string(),
274 action_type:"event".to_string(),
275 payload,
276 headers,
277 timestamp:chrono::Utc::now().timestamp(),
278 nested_actions:vec![],
279 };
280
281 self.SendEchoAction(action).await?;
282 Ok(())
283 }
284
285 pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_connection_state() {
294 let state = ConnectionState::Connected;
295 assert_eq!(state, ConnectionState::Connected);
296 }
297
298 #[test]
299 fn test_heartbeat_config_default() {
300 let config = HeartbeatConfig::default();
301 assert_eq!(config.interval_seconds, 30);
302 assert!(config.enabled);
303 }
304
305 #[tokio::test]
306 async fn test_spine_connection_creation() {
307 let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
308 let connection = SpineConnectionImpl::new(config);
309 assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
310 }
311}