Files
ServiceManager/node/src/main.rs
2026-03-07 19:25:30 +01:00

643 lines
19 KiB
Rust

// src/main.rs
use std::{fs, time::Duration};
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use serde::{Serialize, Deserialize};
use std::process::Command;
use std::sync::{Arc};
use axum::{routing::post, routing::get, routing::delete, Json, Router, http::Method, extract::State, http::{Request, StatusCode},middleware::Next,response::Response,};
use tokio::task;
use tokio::time::{self, interval};
use tokio::sync::Mutex as AsyncMutex;
use clap::{Parser, Subcommand};
use reqwest;
use chrono::{DateTime, Utc};
use tower_http::cors::{Any, AllowOrigin, CorsLayer};
use inquire::{Text, Password};
use rand::{thread_rng, RngCore};
use base64::{engine::general_purpose, Engine as _};
use axum::extract::ConnectInfo;
use std::net::SocketAddr;
#[derive(Deserialize)]
struct DeleteRequest {
service_id: i32,
}
#[derive(Parser)]
#[command(name = "nodecli")]
#[command(about = "CLI to config node", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand, Debug)]
enum Commands {
Config,
Start,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StatusRecord {
pub timestamp: DateTime<Utc>,
pub status: u8,
}
impl Default for StatusRecord {
fn default() -> Self {
StatusRecord {
timestamp: Utc::now(),
status: 0,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ProcessConfig {
#[serde(default)]
id: i32,
name: String,
command: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct ConfigData {
server_url : String,
node_name : String,
node_url : String,
api_key : String,
}
#[derive(Debug, Serialize, Deserialize)]
struct FileData {
node_id: i32,
processes: Vec<ProcessConfig>,
}
static PROCESSES_FILE: &str = "processes.json";
static CONFIG_FILE: &str = "config.json";
async fn list_processes(state: axum::extract::State<AppState>) -> Json<Vec<ProcessConfig>> {
let guard = state.processes.lock().await;
Json(guard.clone())
}
#[derive(Clone)]
struct AppState {
server_url : Arc<AsyncMutex<String>>,
node_name : Arc<AsyncMutex<String>>,
node_url : Arc<AsyncMutex<String>>,
api_key : Arc<AsyncMutex<String>>,
node_id: Arc<AsyncMutex<i32>>,
processes: Arc<AsyncMutex<Vec<ProcessConfig>>>,
current_api_key: Arc<AsyncMutex<ApiKey>>,
}
#[derive(Serialize)]
struct RegistrationRequest {
id: i32,
name: String,
address: String,
}
#[derive(Serialize)]
struct RegistrationServiceRequest {
node_id: i32,
service: ProcessConfig,
}
#[derive(Deserialize)]
struct RegisterResponse {
id: i32,
}
#[derive(Serialize)]
struct ServiceUpdateRequest {
service_id: i32,
service_status: StatusRecord,
}
#[derive(Serialize)]
struct UpdateRequest {
node_id: String,
#[serde(default)]
services: Vec<ServiceUpdateRequest>,
}
#[derive(Clone)]
struct ApiKey {
key: Arc<AsyncMutex<Option<String>>>,
timestamp_end: Arc<AsyncMutex<DateTime<Utc>>>
}
fn generate_secure_api_key() -> String {
let mut key_bytes = [0u8; 32];
thread_rng().fill_bytes(&mut key_bytes);
general_purpose::URL_SAFE_NO_PAD.encode(key_bytes)
}
async fn run_api_key_creation(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
state: axum::extract::State<AppState>,
) -> Result<String, StatusCode> {
if !addr.ip().is_loopback() {
println!("Error access denied");
return Err(StatusCode::FORBIDDEN);
}
{
let mut current_api_key= state.current_api_key.lock().await;
let mut key_guard = current_api_key.key.lock().await;
*key_guard = Some(generate_secure_api_key());
let mut time_guard = current_api_key.timestamp_end.lock().await;
*time_guard = Utc::now() + chrono::Duration::minutes(30);
if let Some(ref k) = *key_guard {
println!("Api enabled for 30min. Key: {}", k);
Ok(k.to_string())
} else {
println!("No key generated.");
return Err(StatusCode::FORBIDDEN);
}
}
}
fn run_config_wizard() {
println!("--- NodeCLI Configuration ---");
let serveur_url = Text::new("The url of the server :" )
.with_placeholder("ex: http://<addess>:<port>")
.prompt();
let node_name = Text::new("The name of this node : ")
.with_placeholder("ex: http://<addess>:<port>")
.prompt();
let node_url = Text::new("url of this node : ")
.with_placeholder("ex: http://<addess>:<port>")
.prompt();
let api_key = Password::new("The API key of the node:")
.with_display_mode(inquire::PasswordDisplayMode::Masked)
.prompt();
match (serveur_url, node_name, node_url, api_key) {
(Ok(s), Ok(n), Ok(nu), Ok(a)) => {
println!("\n✅ Configuration saved !");
println!("Server url : {}", s);
println!("Node name: {}", n);
println!("Node name: {}", nu);
println!("API key : **** (longueur: {})", a.len());
save_config_to_disk(&ConfigData{ server_url: s, node_name: n, node_url : nu, api_key: a })
}
_ => println!("\n❌ Configuration canceled."),
}
}
async fn add_process(
state: axum::extract::State<AppState>,
Json(payload): Json<ProcessConfig>,
) -> Json<String> {
let mut vec_guard= state.processes.lock().await.clone();
let id_guard = state.node_id.lock().await.clone();
let server_url = state.server_url.lock().await.clone();
let api_key = state.api_key.lock().await.clone();
let mut new_process = payload.clone();
let client = reqwest::Client::new();
let request_payload = RegistrationServiceRequest {
node_id: id_guard.clone(),
service: new_process.clone(),
};
let mut headers = HeaderMap::new();
headers.insert(
"X-Node-API-Key",
HeaderValue::from_str(&*api_key).expect("Invalid API Key format")
);
match client
.post(format!("{}/api/registerService", server_url))
.headers(headers)
.json(&request_payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
match response.json::<RegisterResponse>().await {
Ok(data) => {
new_process.id = data.id;
vec_guard.push(new_process.clone());
save_processes_to_disk(&FileData { node_id: data.id, processes: vec_guard.clone() });
return Json(format!("✅ Enregistrement réussi. ID du service attribué : {}", data.id));
}
Err(_) => { return Json(format!("Process error on json received")); },
}
} else {
return Json(format!("⚠️ Échec de l'enregistrement, statut: {}", response.status()));
}
}
Err(e) => {
return Json(format!("❌ Impossible de joindre le serveur Go : {}. Nouvelle tentative dans 5s...", e));
}
}
}
async fn delete_service(
state: axum::extract::State<AppState>,
Json(payload): Json<DeleteRequest>,
) -> impl axum::response::IntoResponse {
let mut guard = state.processes.lock().await.clone();
let id_guard= state.node_id.lock().await.clone();
let service_id = payload.service_id;
let index_to_remove = guard.iter().position(|service| {
service.id == service_id
});
match index_to_remove {
Some(index) => {
let removed_service = guard.remove(index);
println!("Service supprimé avec succès: {}", removed_service.name);
save_processes_to_disk(&FileData { node_id: id_guard.clone(), processes: guard.clone() });
return axum::http::StatusCode::NO_CONTENT;
}
None => {
println!("Erreur: Service ID {} non trouvé.", service_id);
return axum::http::StatusCode::NOT_FOUND;
}
}
}
async fn register_with_server(state: AppState) {
let mut id_guard: tokio::sync::MutexGuard<'_, i32> = state.node_id.lock().await;
let guard = state.processes.lock().await;
let server_url: tokio::sync::MutexGuard<'_, String> = state.server_url.lock().await;
let node_url: tokio::sync::MutexGuard<'_, String> = state.node_url.lock().await;
let node_name: tokio::sync::MutexGuard<'_, String> = state.node_name.lock().await;
let api_key: tokio::sync::MutexGuard<'_, String> = state.api_key.lock().await;
let address_full = format!( "{}", node_url );
eprintln!("full ip is {}", address_full);
let client = reqwest::Client::new();
let payload = RegistrationRequest {
id: id_guard.clone(),
name: node_name.to_string(),
address: address_full,
};
let mut headers = HeaderMap::new();
headers.insert(
"X-Node-API-Key",
HeaderValue::from_str(&*api_key).expect("Invalid API Key format")
);
loop {
match client
.post(format!("{}/api/register", server_url))
.headers(headers.clone())
.json(&payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
match response.json::<RegisterResponse>().await {
Ok(data) => {
println!("✅ Enregistrement réussi. ID du node attribué : {}", data.id);
*id_guard = data.id;
save_processes_to_disk(&FileData { node_id: data.id, processes: guard.clone() });
}
Err(_) => eprintln!("⚠️ Réponse réussie mais format JSON invalide"),
}
break;
} else {
eprintln!("⚠️ Échec de l'enregistrement, statut: {}", response.status());
}
}
Err(e) => {
eprintln!("❌ Impossible de joindre le serveur Go : {}. Nouvelle tentative dans 5s...", e);
}
}
time::sleep(Duration::from_secs(5)).await;
}
}
fn save_config_to_disk(data: &ConfigData) {
let json = serde_json::to_string_pretty(&data).unwrap();
fs::write(CONFIG_FILE, json).unwrap();
}
fn initialize_config_file() {
if !std::path::Path::new(CONFIG_FILE).exists() {
println!("No {} found, creating an empty one...", CONFIG_FILE);
std::fs::write(CONFIG_FILE.to_string(), "[]").expect("Failed to create config.json");
}
else
{
println!("{} found", CONFIG_FILE);
}
}
fn load_config_from_disk() -> ConfigData{
if let Ok(content) = fs::read_to_string(CONFIG_FILE) {
if let Ok(parsed) = serde_json::from_str::<ConfigData>(&content) {
return parsed;
}
}
ConfigData{ server_url: "".to_string(), node_name: "".to_string(), node_url : "".to_string(), api_key:"".to_string() }
}
fn save_processes_to_disk(data: &FileData) {
let json = serde_json::to_string_pretty(&data).unwrap();
fs::write(PROCESSES_FILE, json).unwrap();
}
fn initialize_processes_file() {
if !std::path::Path::new(PROCESSES_FILE).exists() {
println!("No processes.json found, creating an empty one...");
std::fs::write(PROCESSES_FILE, "[]").expect("Failed to create processes.json");
}
else
{
println!("{} found", PROCESSES_FILE);
}
}
fn load_processes_from_disk() -> FileData{
if let Ok(content) = fs::read_to_string(PROCESSES_FILE) {
if let Ok(parsed) = serde_json::from_str::<FileData>(&content) {
return parsed;
}
}
FileData{ node_id: 0, processes: vec![] }
}
fn check_process_running(cmd: &str) -> bool {
let output_result = Command::new("sh").arg("-c").arg(cmd).output();
match output_result {
Ok(output) => {
if !output.status.success() {
let stdout_str = match str::from_utf8(&output.stderr) {
Ok(s) => s,
Err(_) => "",
};
println!("error: {}",stdout_str.to_string());
return false;
}
let stdout_str = match str::from_utf8(&output.stdout) {
Ok(s) => s.trim(),
Err(_) => {
return false;
}
};
println!("result: {}",stdout_str.to_string());
if stdout_str == "1" {
return true;
} else if stdout_str == "0" {
return false;
} else {
return false;
}
}
Err(_) => {
return false;
}
}
}
async fn start_monitor(state: AppState) {
println!("Monitoring started");
let mut interval = interval(Duration::from_mins(1) );
interval.tick().await;
let node_name= state.node_name.lock().await.clone();
let server_url= state.server_url.lock().await.clone();
let api_key= state.api_key.lock().await.clone();
let mut headers = HeaderMap::new();
headers.insert(
"X-Node-API-Key",
HeaderValue::from_str(&api_key).unwrap()
);
loop {
{
interval.tick().await;
let mut update_request = UpdateRequest{ node_id: node_name.to_string(), services: Vec::new() };
let mut guard = state.processes.lock().await;
println!("--- Checking processes ---");
for p in guard.iter_mut() {
let running = check_process_running(&p.command);
println!("{} => {}", p.name, if running { "RUNNING" } else { "STOPPED" });
let mut status = StatusRecord{ status: 0, timestamp: Utc::now() };
if running == true {
status.status = 1;
}
update_request.services.push(ServiceUpdateRequest{ service_id: p.id.clone(), service_status: status.clone() });
}
if update_request.services.len() > 0 {
let client = reqwest::Client::new();
let payload = update_request;
match client
.post(format!("{}/api/updateServiceStatus", server_url ))
.headers(headers.clone())
.json(&payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
println!("✅ Update success");
} else {
eprintln!("⚠️ Update failed {}", response.status());
}
}
Err(e) => {
eprintln!("❌ Error : {}", e);
}
}
}
}
}
}
async fn api_key_auth(
State(state): State<ApiKey>,
req: axum::http::Request<axum::body::Body>,
next: Next,
) -> Result<Response, StatusCode> {
let auth_header = req.headers()
.get("X-Node-API-Key")
.and_then(|val| val.to_str().ok());
let (valid_key, timestamp_end) = {
let key_guard = state.key.lock().await;
let time_guard = state.timestamp_end.lock().await;
(key_guard.clone(), *time_guard)
};
match valid_key.clone() {
Some(valeur) => println!("La clé est : {}", valeur),
None => println!("Il n'y a pas de clé !"),
}
match auth_header.clone() {
Some(s) => println!("Checking api key : {}", s), // s est un &str
None => println!("Empty API key"),
}
match (auth_header, valid_key) {
(Some(sent_key), Some(stored_key))
if sent_key == stored_key && chrono::Utc::now() < timestamp_end =>
{
// Tout est bon, on passe à la suite (la fonction add_process)
println!("api key valid");
Ok(next.run(req).await)
}
_ => {
println!("Error api key not valid");
Err(StatusCode::UNAUTHORIZED)
}
}
}
#[tokio::main]
async fn main() {
initialize_processes_file();
initialize_config_file();
let initial_process = load_processes_from_disk();
println!("initial node_id: {}", initial_process.node_id);
let initial_config = load_config_from_disk();
println!("server url: {}", initial_config.server_url);
let cli = Cli::parse();
if let Some(Commands::Config) = &cli.command {
run_config_wizard();
println!("Configuration terminée. Relancez sans argument pour démarrer le service.");
return;
}
let api_key_state = ApiKey {
key: Arc::new(AsyncMutex::new(None)),
timestamp_end: Arc::new(AsyncMutex::new(Utc::now())),
};
let mut state = AppState {
server_url: Arc::new(AsyncMutex::new(initial_config.server_url)),
node_name: Arc::new(AsyncMutex::new(initial_config.node_name)),
node_url: Arc::new(AsyncMutex::new(initial_config.node_url)),
api_key: Arc::new(AsyncMutex::new(initial_config.api_key)),
node_id: Arc::new(AsyncMutex::new(initial_process.node_id)),
processes: Arc::new(AsyncMutex::new(initial_process.processes)),
current_api_key : Arc::new(AsyncMutex::new(api_key_state.clone())),
};
if let Some(Commands::Start) = &cli.command {
let client = reqwest::Client::new();
match client.post("http://localhost:8081/internal/generate-key").send().await {
Ok(response) => {
if let Ok(key) = response.text().await {
println!("🔑 Clé générée avec succès : {}", key);
}
},
Err(_) => {
println!("❌ Erreur : Impossible de joindre le service Node.");
println!("Vérifiez que 'nodecli start' est bien en cours d'exécution.");
}
}
return;
}
let state_register = state.clone();
tokio::task::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
register_with_server(state_register).await;
});
let state_monitor = state.clone();
task::spawn(async move {
start_monitor(state_monitor).await;
});
let state_for_cors = state.clone();
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::predicate(move |origin: &axum::http::HeaderValue, _parts: &axum::http::request::Parts| {
// Lecture du state de manière synchrone pour le CORS
let server_url = futures::executor::block_on(async {
state_for_cors.server_url.lock().await.clone()
});
// Comparaison (on vérifie si l'origine correspond à notre URL de config)
origin.as_bytes() == server_url.as_bytes()
}))
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
.allow_headers(Any);
let app = Router::new()
.route(
"/add",
post(add_process).route_layer(axum::middleware::from_fn_with_state(api_key_state, api_key_auth))
)
.route("/list", get(list_processes))
.route("/services", delete(delete_service))
.route("/internal/generate-key", post(run_api_key_creation))
.with_state(state)
.layer(cors);
let listener = tokio::net::TcpListener::bind("0.0.0.0:8081").await.unwrap();
println!("Node service running on port 8081");
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>()
)
.await
.unwrap();
}