Files
felhom.eu/hub/internal/store/telemetry.go
T

501 lines
14 KiB
Go

package store
import (
"database/sql"
"encoding/json"
"regexp"
"strings"
"time"
)
var (
reANSI = regexp.MustCompile(`\x1b\[[0-9;]*m`)
reTimestamp = regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*([+-]\d{2}:?\d{2})?[Z ]?:? ?`)
reSyslog = regexp.MustCompile(`[A-Z][a-z]{2}\s+\d{1,2} \d{2}:\d{2}:\d{2} `)
)
// AppTelemetryRecord holds per-app telemetry received from a controller report.
type AppTelemetryRecord struct {
AppName string `json:"app_name"`
DisplayName string `json:"display_name"`
Containers []string `json:"containers"`
MemoryCurrentMB float64 `json:"memory_current_mb"`
MemoryAvgMB float64 `json:"memory_avg_mb"`
MemoryPeakMB float64 `json:"memory_peak_mb"`
CPUAvgPercent float64 `json:"cpu_avg_percent"`
CatalogEstimate string `json:"catalog_estimate"`
CatalogLimit string `json:"catalog_limit"`
LogErrors int `json:"log_errors"`
LogWarnings int `json:"log_warnings"`
Issues []struct {
Severity string `json:"severity"`
Message string `json:"message"`
Count int `json:"count"`
LastSeen time.Time `json:"last_seen"`
} `json:"issues,omitempty"`
}
// FleetAppSummary holds fleet-wide aggregate stats for one app.
type FleetAppSummary struct {
AppName string
DisplayName string
DeploymentCount int
AvgMemoryMB float64
PeakMemoryMB float64
P95MemoryMB float64
AvgCPU float64
TotalErrors int
TotalWarnings int
CatalogEstimate string
CatalogLimit string
}
// AppTelemetryPoint holds one time-series data point for chart rendering.
type AppTelemetryPoint struct {
ReportedAt time.Time
CustomerID string
MemoryAvgMB float64
MemoryPeakMB float64
CPUAvgPercent float64
LogErrors int
LogWarnings int
}
// AppCustomerStats holds per-customer resource stats for an app.
type AppCustomerStats struct {
CustomerID string
AvgMemoryMB float64
PeakMemoryMB float64
AvgCPU float64
TotalErrors int
LastReport time.Time
}
// CustomerAppSummary holds per-app stats for a specific customer.
type CustomerAppSummary struct {
AppName string
DisplayName string
MemoryCurrentMB float64
MemoryAvgMB float64
MemoryPeakMB float64
CatalogLimit string
LogErrors int
LogWarnings int
}
// AppIssue holds a known log issue for an app across the fleet.
type AppIssue struct {
ID int
AppName string
Fingerprint string
Severity string
Message string
FirstSeen time.Time
LastSeen time.Time
OccurrenceCount int
AffectedCustomers []string
}
// SaveAppTelemetry inserts telemetry records for a customer report into the database.
func (s *Store) SaveAppTelemetry(customerID string, reportedAt time.Time, records []AppTelemetryRecord) error {
if len(records) == 0 {
return nil
}
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO app_telemetry
(customer_id, app_name, display_name, reported_at,
memory_current_mb, memory_avg_mb, memory_peak_mb, cpu_avg_percent,
catalog_estimate, catalog_limit, log_errors, log_warnings,
containers_json, issues_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
defer stmt.Close()
for _, r := range records {
containersJSON, _ := json.Marshal(r.Containers)
issuesJSON, _ := json.Marshal(r.Issues)
if _, err := stmt.Exec(
customerID, r.AppName, r.DisplayName, reportedAt,
r.MemoryCurrentMB, r.MemoryAvgMB, r.MemoryPeakMB, r.CPUAvgPercent,
r.CatalogEstimate, r.CatalogLimit, r.LogErrors, r.LogWarnings,
string(containersJSON), string(issuesJSON),
); err != nil {
return err
}
// Upsert log issues
for _, issue := range r.Issues {
fp := fingerprintIssue(issue.Message)
if err := upsertAppIssue(tx, r.AppName, fp, issue.Severity, issue.Message, customerID, issue.LastSeen); err != nil {
s.logger.Printf("[WARN] upsertAppIssue %s/%s: %v", r.AppName, fp[:min(len(fp), 20)], err)
}
}
}
return tx.Commit()
}
// upsertAppIssue inserts or updates a known log issue record.
func upsertAppIssue(tx *sql.Tx, appName, fingerprint, severity, message, customerID string, lastSeen time.Time) error {
// Try insert first
_, err := tx.Exec(`
INSERT INTO app_log_issues (app_name, fingerprint, severity, message, first_seen, last_seen, occurrence_count, affected_customers)
VALUES (?, ?, ?, ?, ?, ?, 1, ?)
ON CONFLICT(app_name, fingerprint) DO UPDATE SET
last_seen = CASE WHEN excluded.last_seen > last_seen THEN excluded.last_seen ELSE last_seen END,
occurrence_count = occurrence_count + 1`,
appName, fingerprint, severity, message, lastSeen, lastSeen, `[]`)
if err != nil {
return err
}
// Update affected_customers JSON — read current, add if missing, write back
var affectedJSON string
err = tx.QueryRow(`SELECT affected_customers FROM app_log_issues WHERE app_name = ? AND fingerprint = ?`,
appName, fingerprint).Scan(&affectedJSON)
if err != nil {
return nil // non-critical
}
var customers []string
json.Unmarshal([]byte(affectedJSON), &customers)
found := false
for _, c := range customers {
if c == customerID {
found = true
break
}
}
if !found {
customers = append(customers, customerID)
newJSON, _ := json.Marshal(customers)
tx.Exec(`UPDATE app_log_issues SET affected_customers = ? WHERE app_name = ? AND fingerprint = ?`,
string(newJSON), appName, fingerprint)
}
return nil
}
// fingerprintIssue creates a short fingerprint key from a message string.
func fingerprintIssue(msg string) string {
s := reANSI.ReplaceAllString(msg, "")
s = reTimestamp.ReplaceAllString(s, "")
s = reSyslog.ReplaceAllString(s, "")
s = strings.TrimSpace(s)
s = strings.ToLower(s)
if len(s) > 100 {
s = s[:100]
}
return s
}
// min returns the smaller of two ints.
func min(a, b int) int {
if a < b {
return a
}
return b
}
// GetFleetAppSummary returns fleet-wide aggregate stats for all apps since the given time.
func (s *Store) GetFleetAppSummary(since time.Time) ([]FleetAppSummary, error) {
rows, err := s.db.Query(`
SELECT app_name,
MAX(display_name) as display_name,
COUNT(DISTINCT customer_id) as deployment_count,
AVG(memory_avg_mb) as avg_memory_mb,
MAX(memory_peak_mb) as peak_memory_mb,
AVG(cpu_avg_percent) as avg_cpu,
SUM(log_errors) as total_errors,
SUM(log_warnings) as total_warnings,
MAX(catalog_estimate) as catalog_estimate,
MAX(catalog_limit) as catalog_limit
FROM app_telemetry
WHERE reported_at > ?
GROUP BY app_name
ORDER BY deployment_count DESC, avg_memory_mb DESC`, since)
if err != nil {
return nil, err
}
defer rows.Close()
var summaries []FleetAppSummary
for rows.Next() {
var fs FleetAppSummary
if err := rows.Scan(
&fs.AppName, &fs.DisplayName, &fs.DeploymentCount,
&fs.AvgMemoryMB, &fs.PeakMemoryMB, &fs.AvgCPU,
&fs.TotalErrors, &fs.TotalWarnings,
&fs.CatalogEstimate, &fs.CatalogLimit,
); err != nil {
continue
}
summaries = append(summaries, fs)
}
if err := rows.Err(); err != nil {
return nil, err
}
// Calculate P95 memory per app
for i := range summaries {
p95, err := s.calcP95Memory(summaries[i].AppName, since)
if err == nil {
summaries[i].P95MemoryMB = p95
}
}
return summaries, nil
}
// calcP95Memory returns the approximate P95 memory_peak_mb for an app since the given time.
func (s *Store) calcP95Memory(appName string, since time.Time) (float64, error) {
// Get count first
var count int
err := s.db.QueryRow(`SELECT COUNT(*) FROM app_telemetry WHERE app_name = ? AND reported_at > ?`,
appName, since).Scan(&count)
if err != nil || count == 0 {
return 0, err
}
offset := int(float64(count) * 0.95)
if offset >= count {
offset = count - 1
}
var p95 float64
err = s.db.QueryRow(`
SELECT memory_peak_mb FROM app_telemetry
WHERE app_name = ? AND reported_at > ?
ORDER BY memory_peak_mb ASC
LIMIT 1 OFFSET ?`, appName, since, offset).Scan(&p95)
if err != nil {
return 0, err
}
return p95, nil
}
// GetAppTelemetryHistory returns time-series telemetry for a specific app.
func (s *Store) GetAppTelemetryHistory(appName string, since time.Time) ([]AppTelemetryPoint, error) {
rows, err := s.db.Query(`
SELECT reported_at, customer_id, memory_avg_mb, memory_peak_mb, cpu_avg_percent, log_errors, log_warnings
FROM app_telemetry
WHERE app_name = ? AND reported_at > ?
ORDER BY reported_at ASC`, appName, since)
if err != nil {
return nil, err
}
defer rows.Close()
var points []AppTelemetryPoint
for rows.Next() {
var p AppTelemetryPoint
if err := rows.Scan(&p.ReportedAt, &p.CustomerID, &p.MemoryAvgMB, &p.MemoryPeakMB,
&p.CPUAvgPercent, &p.LogErrors, &p.LogWarnings); err != nil {
continue
}
points = append(points, p)
}
return points, rows.Err()
}
// GetAppCustomerBreakdown returns per-customer resource stats for an app.
func (s *Store) GetAppCustomerBreakdown(appName string, since time.Time) ([]AppCustomerStats, error) {
rows, err := s.db.Query(`
SELECT customer_id, AVG(memory_avg_mb), MAX(memory_peak_mb), AVG(cpu_avg_percent),
SUM(log_errors), MAX(reported_at)
FROM app_telemetry
WHERE app_name = ? AND reported_at > ?
GROUP BY customer_id
ORDER BY AVG(memory_avg_mb) DESC`, appName, since)
if err != nil {
return nil, err
}
defer rows.Close()
var stats []AppCustomerStats
for rows.Next() {
var cs AppCustomerStats
if err := rows.Scan(&cs.CustomerID, &cs.AvgMemoryMB, &cs.PeakMemoryMB, &cs.AvgCPU,
&cs.TotalErrors, &cs.LastReport); err != nil {
continue
}
stats = append(stats, cs)
}
return stats, rows.Err()
}
// GetCustomerAppSummary returns per-app telemetry summary for a specific customer.
func (s *Store) GetCustomerAppSummary(customerID string, since time.Time) ([]CustomerAppSummary, error) {
// 7-day averages/peaks
rows, err := s.db.Query(`
SELECT app_name, MAX(display_name), AVG(memory_avg_mb), MAX(memory_peak_mb),
MAX(catalog_limit), SUM(log_errors), SUM(log_warnings)
FROM app_telemetry
WHERE customer_id = ? AND reported_at > ?
GROUP BY app_name
ORDER BY AVG(memory_avg_mb) DESC`, customerID, since)
if err != nil {
return nil, err
}
defer rows.Close()
summaryMap := make(map[string]*CustomerAppSummary)
var order []string
for rows.Next() {
var cs CustomerAppSummary
if err := rows.Scan(&cs.AppName, &cs.DisplayName, &cs.MemoryAvgMB, &cs.MemoryPeakMB,
&cs.CatalogLimit, &cs.LogErrors, &cs.LogWarnings); err != nil {
continue
}
summaryMap[cs.AppName] = &cs
order = append(order, cs.AppName)
}
if err := rows.Err(); err != nil {
return nil, err
}
// Get current memory from most recent report per app
currentRows, err := s.db.Query(`
SELECT app_name, memory_current_mb
FROM app_telemetry
WHERE customer_id = ? AND reported_at = (
SELECT MAX(reported_at) FROM app_telemetry at2
WHERE at2.customer_id = ? AND at2.app_name = app_telemetry.app_name
)`, customerID, customerID)
if err == nil {
defer currentRows.Close()
for currentRows.Next() {
var appName string
var currentMB float64
if err := currentRows.Scan(&appName, &currentMB); err == nil {
if s, ok := summaryMap[appName]; ok {
s.MemoryCurrentMB = currentMB
}
}
}
}
result := make([]CustomerAppSummary, 0, len(order))
for _, name := range order {
if s, ok := summaryMap[name]; ok {
result = append(result, *s)
}
}
return result, nil
}
// GetAppIssues returns recent known issues for a specific app.
func (s *Store) GetAppIssues(appName string, limit int) ([]AppIssue, error) {
rows, err := s.db.Query(`
SELECT id, app_name, fingerprint, severity, message, first_seen, last_seen,
occurrence_count, affected_customers
FROM app_log_issues
WHERE app_name = ?
ORDER BY last_seen DESC
LIMIT ?`, appName, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanAppIssues(rows)
}
// GetRecentIssuesAllApps returns the most recently seen issues across all apps.
func (s *Store) GetRecentIssuesAllApps(limit int) ([]AppIssue, error) {
rows, err := s.db.Query(`
SELECT id, app_name, fingerprint, severity, message, first_seen, last_seen,
occurrence_count, affected_customers
FROM app_log_issues
ORDER BY last_seen DESC
LIMIT ?`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanAppIssues(rows)
}
func scanAppIssues(rows *sql.Rows) ([]AppIssue, error) {
var issues []AppIssue
for rows.Next() {
var ai AppIssue
var affectedJSON string
if err := rows.Scan(&ai.ID, &ai.AppName, &ai.Fingerprint, &ai.Severity, &ai.Message,
&ai.FirstSeen, &ai.LastSeen, &ai.OccurrenceCount, &affectedJSON); err != nil {
continue
}
json.Unmarshal([]byte(affectedJSON), &ai.AffectedCustomers)
issues = append(issues, ai)
}
return issues, rows.Err()
}
// PruneAppTelemetry removes telemetry rows older than the given time.
func (s *Store) PruneAppTelemetry(before time.Time) (int64, error) {
res, err := s.db.Exec("DELETE FROM app_telemetry WHERE reported_at < ?", before)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// PruneStaleIssues removes issue records not seen since the given time.
func (s *Store) PruneStaleIssues(notSeenSince time.Time) (int64, error) {
res, err := s.db.Exec("DELETE FROM app_log_issues WHERE last_seen < ?", notSeenSince)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// DeleteAppTelemetry removes all telemetry records for a specific app.
func (s *Store) DeleteAppTelemetry(appName string) (int64, error) {
res, err := s.db.Exec("DELETE FROM app_telemetry WHERE app_name = ?", appName)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// DeleteAppIssues removes all known-issue records for a specific app
// and zeroes the error/warning counters in telemetry history.
func (s *Store) DeleteAppIssues(appName string) (int64, error) {
res, err := s.db.Exec("DELETE FROM app_log_issues WHERE app_name = ?", appName)
if err != nil {
return 0, err
}
s.db.Exec("UPDATE app_telemetry SET log_errors = 0, log_warnings = 0 WHERE app_name = ?", appName)
return res.RowsAffected()
}
// DeleteAppIssuesByIDs removes specific issue records by their IDs.
func (s *Store) DeleteAppIssuesByIDs(ids []int) (int64, error) {
if len(ids) == 0 {
return 0, nil
}
placeholders := make([]string, len(ids))
args := make([]interface{}, len(ids))
for i, id := range ids {
placeholders[i] = "?"
args[i] = id
}
query := "DELETE FROM app_log_issues WHERE id IN (" + strings.Join(placeholders, ",") + ")"
res, err := s.db.Exec(query, args...)
if err != nil {
return 0, err
}
return res.RowsAffected()
}