package store import ( "database/sql" "encoding/json" "time" ) // AppTelemetryRecord holds per-app telemetry received from a controller report. type AppTelemetryRecord struct { AppName string `json:"app_name"` DisplayName string `json:"display_name"` Containers []string `json:"containers"` MemoryCurrentMB float64 `json:"memory_current_mb"` MemoryAvgMB float64 `json:"memory_avg_mb"` MemoryPeakMB float64 `json:"memory_peak_mb"` CPUAvgPercent float64 `json:"cpu_avg_percent"` CatalogEstimate string `json:"catalog_estimate"` CatalogLimit string `json:"catalog_limit"` LogErrors int `json:"log_errors"` LogWarnings int `json:"log_warnings"` Issues []struct { Severity string `json:"severity"` Message string `json:"message"` Count int `json:"count"` LastSeen time.Time `json:"last_seen"` } `json:"issues,omitempty"` } // FleetAppSummary holds fleet-wide aggregate stats for one app. type FleetAppSummary struct { AppName string DisplayName string DeploymentCount int AvgMemoryMB float64 PeakMemoryMB float64 P95MemoryMB float64 AvgCPU float64 TotalErrors int TotalWarnings int CatalogEstimate string CatalogLimit string } // AppTelemetryPoint holds one time-series data point for chart rendering. type AppTelemetryPoint struct { ReportedAt time.Time CustomerID string MemoryAvgMB float64 MemoryPeakMB float64 CPUAvgPercent float64 LogErrors int LogWarnings int } // AppCustomerStats holds per-customer resource stats for an app. type AppCustomerStats struct { CustomerID string AvgMemoryMB float64 PeakMemoryMB float64 AvgCPU float64 TotalErrors int LastReport time.Time } // CustomerAppSummary holds per-app stats for a specific customer. type CustomerAppSummary struct { AppName string DisplayName string MemoryCurrentMB float64 MemoryAvgMB float64 MemoryPeakMB float64 CatalogLimit string LogErrors int LogWarnings int } // AppIssue holds a known log issue for an app across the fleet. type AppIssue struct { ID int AppName string Fingerprint string Severity string Message string FirstSeen time.Time LastSeen time.Time OccurrenceCount int AffectedCustomers []string } // SaveAppTelemetry inserts telemetry records for a customer report into the database. func (s *Store) SaveAppTelemetry(customerID string, reportedAt time.Time, records []AppTelemetryRecord) error { if len(records) == 0 { return nil } tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() stmt, err := tx.Prepare(` INSERT INTO app_telemetry (customer_id, app_name, display_name, reported_at, memory_current_mb, memory_avg_mb, memory_peak_mb, cpu_avg_percent, catalog_estimate, catalog_limit, log_errors, log_warnings, containers_json, issues_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) if err != nil { return err } defer stmt.Close() for _, r := range records { containersJSON, _ := json.Marshal(r.Containers) issuesJSON, _ := json.Marshal(r.Issues) if _, err := stmt.Exec( customerID, r.AppName, r.DisplayName, reportedAt, r.MemoryCurrentMB, r.MemoryAvgMB, r.MemoryPeakMB, r.CPUAvgPercent, r.CatalogEstimate, r.CatalogLimit, r.LogErrors, r.LogWarnings, string(containersJSON), string(issuesJSON), ); err != nil { return err } // Upsert log issues for _, issue := range r.Issues { fp := fingerprintIssue(issue.Message) if err := upsertAppIssue(tx, r.AppName, fp, issue.Severity, issue.Message, customerID, issue.LastSeen); err != nil { s.logger.Printf("[WARN] upsertAppIssue %s/%s: %v", r.AppName, fp[:min(len(fp), 20)], err) } } } return tx.Commit() } // upsertAppIssue inserts or updates a known log issue record. func upsertAppIssue(tx *sql.Tx, appName, fingerprint, severity, message, customerID string, lastSeen time.Time) error { // Try insert first _, err := tx.Exec(` INSERT INTO app_log_issues (app_name, fingerprint, severity, message, first_seen, last_seen, occurrence_count, affected_customers) VALUES (?, ?, ?, ?, ?, ?, 1, ?) ON CONFLICT(app_name, fingerprint) DO UPDATE SET last_seen = CASE WHEN excluded.last_seen > last_seen THEN excluded.last_seen ELSE last_seen END, occurrence_count = occurrence_count + 1`, appName, fingerprint, severity, message, lastSeen, lastSeen, `[]`) if err != nil { return err } // Update affected_customers JSON — read current, add if missing, write back var affectedJSON string err = tx.QueryRow(`SELECT affected_customers FROM app_log_issues WHERE app_name = ? AND fingerprint = ?`, appName, fingerprint).Scan(&affectedJSON) if err != nil { return nil // non-critical } var customers []string json.Unmarshal([]byte(affectedJSON), &customers) found := false for _, c := range customers { if c == customerID { found = true break } } if !found { customers = append(customers, customerID) newJSON, _ := json.Marshal(customers) tx.Exec(`UPDATE app_log_issues SET affected_customers = ? WHERE app_name = ? AND fingerprint = ?`, string(newJSON), appName, fingerprint) } return nil } // fingerprintIssue creates a short fingerprint key from a message string. func fingerprintIssue(msg string) string { if len(msg) > 100 { msg = msg[:100] } return msg } // min returns the smaller of two ints. func min(a, b int) int { if a < b { return a } return b } // GetFleetAppSummary returns fleet-wide aggregate stats for all apps since the given time. func (s *Store) GetFleetAppSummary(since time.Time) ([]FleetAppSummary, error) { rows, err := s.db.Query(` SELECT app_name, MAX(display_name) as display_name, COUNT(DISTINCT customer_id) as deployment_count, AVG(memory_avg_mb) as avg_memory_mb, MAX(memory_peak_mb) as peak_memory_mb, AVG(cpu_avg_percent) as avg_cpu, SUM(log_errors) as total_errors, SUM(log_warnings) as total_warnings, MAX(catalog_estimate) as catalog_estimate, MAX(catalog_limit) as catalog_limit FROM app_telemetry WHERE reported_at > ? GROUP BY app_name ORDER BY deployment_count DESC, avg_memory_mb DESC`, since) if err != nil { return nil, err } defer rows.Close() var summaries []FleetAppSummary for rows.Next() { var fs FleetAppSummary if err := rows.Scan( &fs.AppName, &fs.DisplayName, &fs.DeploymentCount, &fs.AvgMemoryMB, &fs.PeakMemoryMB, &fs.AvgCPU, &fs.TotalErrors, &fs.TotalWarnings, &fs.CatalogEstimate, &fs.CatalogLimit, ); err != nil { continue } summaries = append(summaries, fs) } if err := rows.Err(); err != nil { return nil, err } // Calculate P95 memory per app for i := range summaries { p95, err := s.calcP95Memory(summaries[i].AppName, since) if err == nil { summaries[i].P95MemoryMB = p95 } } return summaries, nil } // calcP95Memory returns the approximate P95 memory_peak_mb for an app since the given time. func (s *Store) calcP95Memory(appName string, since time.Time) (float64, error) { // Get count first var count int err := s.db.QueryRow(`SELECT COUNT(*) FROM app_telemetry WHERE app_name = ? AND reported_at > ?`, appName, since).Scan(&count) if err != nil || count == 0 { return 0, err } offset := int(float64(count) * 0.95) if offset >= count { offset = count - 1 } var p95 float64 err = s.db.QueryRow(` SELECT memory_peak_mb FROM app_telemetry WHERE app_name = ? AND reported_at > ? ORDER BY memory_peak_mb ASC LIMIT 1 OFFSET ?`, appName, since, offset).Scan(&p95) if err != nil { return 0, err } return p95, nil } // GetAppTelemetryHistory returns time-series telemetry for a specific app. func (s *Store) GetAppTelemetryHistory(appName string, since time.Time) ([]AppTelemetryPoint, error) { rows, err := s.db.Query(` SELECT reported_at, customer_id, memory_avg_mb, memory_peak_mb, cpu_avg_percent, log_errors, log_warnings FROM app_telemetry WHERE app_name = ? AND reported_at > ? ORDER BY reported_at ASC`, appName, since) if err != nil { return nil, err } defer rows.Close() var points []AppTelemetryPoint for rows.Next() { var p AppTelemetryPoint if err := rows.Scan(&p.ReportedAt, &p.CustomerID, &p.MemoryAvgMB, &p.MemoryPeakMB, &p.CPUAvgPercent, &p.LogErrors, &p.LogWarnings); err != nil { continue } points = append(points, p) } return points, rows.Err() } // GetAppCustomerBreakdown returns per-customer resource stats for an app. func (s *Store) GetAppCustomerBreakdown(appName string, since time.Time) ([]AppCustomerStats, error) { rows, err := s.db.Query(` SELECT customer_id, AVG(memory_avg_mb), MAX(memory_peak_mb), AVG(cpu_avg_percent), SUM(log_errors), MAX(reported_at) FROM app_telemetry WHERE app_name = ? AND reported_at > ? GROUP BY customer_id ORDER BY AVG(memory_avg_mb) DESC`, appName, since) if err != nil { return nil, err } defer rows.Close() var stats []AppCustomerStats for rows.Next() { var cs AppCustomerStats if err := rows.Scan(&cs.CustomerID, &cs.AvgMemoryMB, &cs.PeakMemoryMB, &cs.AvgCPU, &cs.TotalErrors, &cs.LastReport); err != nil { continue } stats = append(stats, cs) } return stats, rows.Err() } // GetCustomerAppSummary returns per-app telemetry summary for a specific customer. func (s *Store) GetCustomerAppSummary(customerID string, since time.Time) ([]CustomerAppSummary, error) { // 7-day averages/peaks rows, err := s.db.Query(` SELECT app_name, MAX(display_name), AVG(memory_avg_mb), MAX(memory_peak_mb), MAX(catalog_limit), SUM(log_errors), SUM(log_warnings) FROM app_telemetry WHERE customer_id = ? AND reported_at > ? GROUP BY app_name ORDER BY AVG(memory_avg_mb) DESC`, customerID, since) if err != nil { return nil, err } defer rows.Close() summaryMap := make(map[string]*CustomerAppSummary) var order []string for rows.Next() { var cs CustomerAppSummary if err := rows.Scan(&cs.AppName, &cs.DisplayName, &cs.MemoryAvgMB, &cs.MemoryPeakMB, &cs.CatalogLimit, &cs.LogErrors, &cs.LogWarnings); err != nil { continue } summaryMap[cs.AppName] = &cs order = append(order, cs.AppName) } if err := rows.Err(); err != nil { return nil, err } // Get current memory from most recent report per app currentRows, err := s.db.Query(` SELECT app_name, memory_current_mb FROM app_telemetry WHERE customer_id = ? AND reported_at = ( SELECT MAX(reported_at) FROM app_telemetry at2 WHERE at2.customer_id = ? AND at2.app_name = app_telemetry.app_name )`, customerID, customerID) if err == nil { defer currentRows.Close() for currentRows.Next() { var appName string var currentMB float64 if err := currentRows.Scan(&appName, ¤tMB); err == nil { if s, ok := summaryMap[appName]; ok { s.MemoryCurrentMB = currentMB } } } } result := make([]CustomerAppSummary, 0, len(order)) for _, name := range order { if s, ok := summaryMap[name]; ok { result = append(result, *s) } } return result, nil } // GetAppIssues returns recent known issues for a specific app. func (s *Store) GetAppIssues(appName string, limit int) ([]AppIssue, error) { rows, err := s.db.Query(` SELECT id, app_name, fingerprint, severity, message, first_seen, last_seen, occurrence_count, affected_customers FROM app_log_issues WHERE app_name = ? ORDER BY last_seen DESC LIMIT ?`, appName, limit) if err != nil { return nil, err } defer rows.Close() return scanAppIssues(rows) } // GetRecentIssuesAllApps returns the most recently seen issues across all apps. func (s *Store) GetRecentIssuesAllApps(limit int) ([]AppIssue, error) { rows, err := s.db.Query(` SELECT id, app_name, fingerprint, severity, message, first_seen, last_seen, occurrence_count, affected_customers FROM app_log_issues ORDER BY last_seen DESC LIMIT ?`, limit) if err != nil { return nil, err } defer rows.Close() return scanAppIssues(rows) } func scanAppIssues(rows *sql.Rows) ([]AppIssue, error) { var issues []AppIssue for rows.Next() { var ai AppIssue var affectedJSON string if err := rows.Scan(&ai.ID, &ai.AppName, &ai.Fingerprint, &ai.Severity, &ai.Message, &ai.FirstSeen, &ai.LastSeen, &ai.OccurrenceCount, &affectedJSON); err != nil { continue } json.Unmarshal([]byte(affectedJSON), &ai.AffectedCustomers) issues = append(issues, ai) } return issues, rows.Err() } // PruneAppTelemetry removes telemetry rows older than the given time. func (s *Store) PruneAppTelemetry(before time.Time) (int64, error) { res, err := s.db.Exec("DELETE FROM app_telemetry WHERE reported_at < ?", before) if err != nil { return 0, err } return res.RowsAffected() } // PruneStaleIssues removes issue records not seen since the given time. func (s *Store) PruneStaleIssues(notSeenSince time.Time) (int64, error) { res, err := s.db.Exec("DELETE FROM app_log_issues WHERE last_seen < ?", notSeenSince) if err != nil { return 0, err } return res.RowsAffected() } // DeleteAppTelemetry removes all telemetry records for a specific app. func (s *Store) DeleteAppTelemetry(appName string) (int64, error) { res, err := s.db.Exec("DELETE FROM app_telemetry WHERE app_name = ?", appName) if err != nil { return 0, err } return res.RowsAffected() } // DeleteAppIssues removes all known-issue records for a specific app. func (s *Store) DeleteAppIssues(appName string) (int64, error) { res, err := s.db.Exec("DELETE FROM app_log_issues WHERE app_name = ?", appName) if err != nil { return 0, err } return res.RowsAffected() }