7860f96a56
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
499 lines
14 KiB
Go
499 lines
14 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
reANSI = regexp.MustCompile(`\x1b\[[0-9;]*m`)
|
|
reTimestamp = regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*([+-]\d{2}:?\d{2})?[Z ]?:? ?`)
|
|
reSyslog = regexp.MustCompile(`[A-Z][a-z]{2}\s+\d{1,2} \d{2}:\d{2}:\d{2} `)
|
|
)
|
|
|
|
// AppTelemetryRecord holds per-app telemetry received from a controller report.
|
|
type AppTelemetryRecord struct {
|
|
AppName string `json:"app_name"`
|
|
DisplayName string `json:"display_name"`
|
|
Containers []string `json:"containers"`
|
|
MemoryCurrentMB float64 `json:"memory_current_mb"`
|
|
MemoryAvgMB float64 `json:"memory_avg_mb"`
|
|
MemoryPeakMB float64 `json:"memory_peak_mb"`
|
|
CPUAvgPercent float64 `json:"cpu_avg_percent"`
|
|
CatalogEstimate string `json:"catalog_estimate"`
|
|
CatalogLimit string `json:"catalog_limit"`
|
|
LogErrors int `json:"log_errors"`
|
|
LogWarnings int `json:"log_warnings"`
|
|
Issues []struct {
|
|
Severity string `json:"severity"`
|
|
Message string `json:"message"`
|
|
Count int `json:"count"`
|
|
LastSeen time.Time `json:"last_seen"`
|
|
} `json:"issues,omitempty"`
|
|
}
|
|
|
|
// FleetAppSummary holds fleet-wide aggregate stats for one app.
|
|
type FleetAppSummary struct {
|
|
AppName string
|
|
DisplayName string
|
|
DeploymentCount int
|
|
AvgMemoryMB float64
|
|
PeakMemoryMB float64
|
|
P95MemoryMB float64
|
|
AvgCPU float64
|
|
TotalErrors int
|
|
TotalWarnings int
|
|
CatalogEstimate string
|
|
CatalogLimit string
|
|
}
|
|
|
|
// AppTelemetryPoint holds one time-series data point for chart rendering.
|
|
type AppTelemetryPoint struct {
|
|
ReportedAt time.Time
|
|
CustomerID string
|
|
MemoryAvgMB float64
|
|
MemoryPeakMB float64
|
|
CPUAvgPercent float64
|
|
LogErrors int
|
|
LogWarnings int
|
|
}
|
|
|
|
// AppCustomerStats holds per-customer resource stats for an app.
|
|
type AppCustomerStats struct {
|
|
CustomerID string
|
|
AvgMemoryMB float64
|
|
PeakMemoryMB float64
|
|
AvgCPU float64
|
|
TotalErrors int
|
|
LastReport time.Time
|
|
}
|
|
|
|
// CustomerAppSummary holds per-app stats for a specific customer.
|
|
type CustomerAppSummary struct {
|
|
AppName string
|
|
DisplayName string
|
|
MemoryCurrentMB float64
|
|
MemoryAvgMB float64
|
|
MemoryPeakMB float64
|
|
CatalogLimit string
|
|
LogErrors int
|
|
LogWarnings int
|
|
}
|
|
|
|
// AppIssue holds a known log issue for an app across the fleet.
|
|
type AppIssue struct {
|
|
ID int
|
|
AppName string
|
|
Fingerprint string
|
|
Severity string
|
|
Message string
|
|
FirstSeen time.Time
|
|
LastSeen time.Time
|
|
OccurrenceCount int
|
|
AffectedCustomers []string
|
|
}
|
|
|
|
// SaveAppTelemetry inserts telemetry records for a customer report into the database.
|
|
func (s *Store) SaveAppTelemetry(customerID string, reportedAt time.Time, records []AppTelemetryRecord) error {
|
|
if len(records) == 0 {
|
|
return nil
|
|
}
|
|
|
|
tx, err := s.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
stmt, err := tx.Prepare(`
|
|
INSERT INTO app_telemetry
|
|
(customer_id, app_name, display_name, reported_at,
|
|
memory_current_mb, memory_avg_mb, memory_peak_mb, cpu_avg_percent,
|
|
catalog_estimate, catalog_limit, log_errors, log_warnings,
|
|
containers_json, issues_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, r := range records {
|
|
containersJSON, _ := json.Marshal(r.Containers)
|
|
issuesJSON, _ := json.Marshal(r.Issues)
|
|
|
|
if _, err := stmt.Exec(
|
|
customerID, r.AppName, r.DisplayName, reportedAt,
|
|
r.MemoryCurrentMB, r.MemoryAvgMB, r.MemoryPeakMB, r.CPUAvgPercent,
|
|
r.CatalogEstimate, r.CatalogLimit, r.LogErrors, r.LogWarnings,
|
|
string(containersJSON), string(issuesJSON),
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Upsert log issues
|
|
for _, issue := range r.Issues {
|
|
fp := fingerprintIssue(issue.Message)
|
|
if err := upsertAppIssue(tx, r.AppName, fp, issue.Severity, issue.Message, customerID, issue.LastSeen); err != nil {
|
|
s.logger.Printf("[WARN] upsertAppIssue %s/%s: %v", r.AppName, fp[:min(len(fp), 20)], err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// upsertAppIssue inserts or updates a known log issue record.
|
|
func upsertAppIssue(tx *sql.Tx, appName, fingerprint, severity, message, customerID string, lastSeen time.Time) error {
|
|
// Try insert first
|
|
_, err := tx.Exec(`
|
|
INSERT INTO app_log_issues (app_name, fingerprint, severity, message, first_seen, last_seen, occurrence_count, affected_customers)
|
|
VALUES (?, ?, ?, ?, ?, ?, 1, ?)
|
|
ON CONFLICT(app_name, fingerprint) DO UPDATE SET
|
|
last_seen = CASE WHEN excluded.last_seen > last_seen THEN excluded.last_seen ELSE last_seen END,
|
|
occurrence_count = occurrence_count + 1`,
|
|
appName, fingerprint, severity, message, lastSeen, lastSeen, `[]`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Update affected_customers JSON — read current, add if missing, write back
|
|
var affectedJSON string
|
|
err = tx.QueryRow(`SELECT affected_customers FROM app_log_issues WHERE app_name = ? AND fingerprint = ?`,
|
|
appName, fingerprint).Scan(&affectedJSON)
|
|
if err != nil {
|
|
return nil // non-critical
|
|
}
|
|
|
|
var customers []string
|
|
json.Unmarshal([]byte(affectedJSON), &customers)
|
|
found := false
|
|
for _, c := range customers {
|
|
if c == customerID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
customers = append(customers, customerID)
|
|
newJSON, _ := json.Marshal(customers)
|
|
tx.Exec(`UPDATE app_log_issues SET affected_customers = ? WHERE app_name = ? AND fingerprint = ?`,
|
|
string(newJSON), appName, fingerprint)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// fingerprintIssue creates a short fingerprint key from a message string.
|
|
func fingerprintIssue(msg string) string {
|
|
s := reANSI.ReplaceAllString(msg, "")
|
|
s = reTimestamp.ReplaceAllString(s, "")
|
|
s = reSyslog.ReplaceAllString(s, "")
|
|
s = strings.TrimSpace(s)
|
|
s = strings.ToLower(s)
|
|
if len(s) > 100 {
|
|
s = s[:100]
|
|
}
|
|
return s
|
|
}
|
|
|
|
// min returns the smaller of two ints.
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// GetFleetAppSummary returns fleet-wide aggregate stats for all apps since the given time.
|
|
func (s *Store) GetFleetAppSummary(since time.Time) ([]FleetAppSummary, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT app_name,
|
|
MAX(display_name) as display_name,
|
|
COUNT(DISTINCT customer_id) as deployment_count,
|
|
AVG(memory_avg_mb) as avg_memory_mb,
|
|
MAX(memory_peak_mb) as peak_memory_mb,
|
|
AVG(cpu_avg_percent) as avg_cpu,
|
|
SUM(log_errors) as total_errors,
|
|
SUM(log_warnings) as total_warnings,
|
|
MAX(catalog_estimate) as catalog_estimate,
|
|
MAX(catalog_limit) as catalog_limit
|
|
FROM app_telemetry
|
|
WHERE reported_at > ?
|
|
GROUP BY app_name
|
|
ORDER BY deployment_count DESC, avg_memory_mb DESC`, since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var summaries []FleetAppSummary
|
|
for rows.Next() {
|
|
var fs FleetAppSummary
|
|
if err := rows.Scan(
|
|
&fs.AppName, &fs.DisplayName, &fs.DeploymentCount,
|
|
&fs.AvgMemoryMB, &fs.PeakMemoryMB, &fs.AvgCPU,
|
|
&fs.TotalErrors, &fs.TotalWarnings,
|
|
&fs.CatalogEstimate, &fs.CatalogLimit,
|
|
); err != nil {
|
|
continue
|
|
}
|
|
summaries = append(summaries, fs)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Calculate P95 memory per app
|
|
for i := range summaries {
|
|
p95, err := s.calcP95Memory(summaries[i].AppName, since)
|
|
if err == nil {
|
|
summaries[i].P95MemoryMB = p95
|
|
}
|
|
}
|
|
|
|
return summaries, nil
|
|
}
|
|
|
|
// calcP95Memory returns the approximate P95 memory_peak_mb for an app since the given time.
|
|
func (s *Store) calcP95Memory(appName string, since time.Time) (float64, error) {
|
|
// Get count first
|
|
var count int
|
|
err := s.db.QueryRow(`SELECT COUNT(*) FROM app_telemetry WHERE app_name = ? AND reported_at > ?`,
|
|
appName, since).Scan(&count)
|
|
if err != nil || count == 0 {
|
|
return 0, err
|
|
}
|
|
|
|
offset := int(float64(count) * 0.95)
|
|
if offset >= count {
|
|
offset = count - 1
|
|
}
|
|
|
|
var p95 float64
|
|
err = s.db.QueryRow(`
|
|
SELECT memory_peak_mb FROM app_telemetry
|
|
WHERE app_name = ? AND reported_at > ?
|
|
ORDER BY memory_peak_mb ASC
|
|
LIMIT 1 OFFSET ?`, appName, since, offset).Scan(&p95)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return p95, nil
|
|
}
|
|
|
|
// GetAppTelemetryHistory returns time-series telemetry for a specific app.
|
|
func (s *Store) GetAppTelemetryHistory(appName string, since time.Time) ([]AppTelemetryPoint, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT reported_at, customer_id, memory_avg_mb, memory_peak_mb, cpu_avg_percent, log_errors, log_warnings
|
|
FROM app_telemetry
|
|
WHERE app_name = ? AND reported_at > ?
|
|
ORDER BY reported_at ASC`, appName, since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var points []AppTelemetryPoint
|
|
for rows.Next() {
|
|
var p AppTelemetryPoint
|
|
if err := rows.Scan(&p.ReportedAt, &p.CustomerID, &p.MemoryAvgMB, &p.MemoryPeakMB,
|
|
&p.CPUAvgPercent, &p.LogErrors, &p.LogWarnings); err != nil {
|
|
continue
|
|
}
|
|
points = append(points, p)
|
|
}
|
|
return points, rows.Err()
|
|
}
|
|
|
|
// GetAppCustomerBreakdown returns per-customer resource stats for an app.
|
|
func (s *Store) GetAppCustomerBreakdown(appName string, since time.Time) ([]AppCustomerStats, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT customer_id, AVG(memory_avg_mb), MAX(memory_peak_mb), AVG(cpu_avg_percent),
|
|
SUM(log_errors), MAX(reported_at)
|
|
FROM app_telemetry
|
|
WHERE app_name = ? AND reported_at > ?
|
|
GROUP BY customer_id
|
|
ORDER BY AVG(memory_avg_mb) DESC`, appName, since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var stats []AppCustomerStats
|
|
for rows.Next() {
|
|
var cs AppCustomerStats
|
|
if err := rows.Scan(&cs.CustomerID, &cs.AvgMemoryMB, &cs.PeakMemoryMB, &cs.AvgCPU,
|
|
&cs.TotalErrors, &cs.LastReport); err != nil {
|
|
continue
|
|
}
|
|
stats = append(stats, cs)
|
|
}
|
|
return stats, rows.Err()
|
|
}
|
|
|
|
// GetCustomerAppSummary returns per-app telemetry summary for a specific customer.
|
|
func (s *Store) GetCustomerAppSummary(customerID string, since time.Time) ([]CustomerAppSummary, error) {
|
|
// 7-day averages/peaks
|
|
rows, err := s.db.Query(`
|
|
SELECT app_name, MAX(display_name), AVG(memory_avg_mb), MAX(memory_peak_mb),
|
|
MAX(catalog_limit), SUM(log_errors), SUM(log_warnings)
|
|
FROM app_telemetry
|
|
WHERE customer_id = ? AND reported_at > ?
|
|
GROUP BY app_name
|
|
ORDER BY AVG(memory_avg_mb) DESC`, customerID, since)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
summaryMap := make(map[string]*CustomerAppSummary)
|
|
var order []string
|
|
for rows.Next() {
|
|
var cs CustomerAppSummary
|
|
if err := rows.Scan(&cs.AppName, &cs.DisplayName, &cs.MemoryAvgMB, &cs.MemoryPeakMB,
|
|
&cs.CatalogLimit, &cs.LogErrors, &cs.LogWarnings); err != nil {
|
|
continue
|
|
}
|
|
summaryMap[cs.AppName] = &cs
|
|
order = append(order, cs.AppName)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get current memory from most recent report per app
|
|
currentRows, err := s.db.Query(`
|
|
SELECT app_name, memory_current_mb
|
|
FROM app_telemetry
|
|
WHERE customer_id = ? AND reported_at = (
|
|
SELECT MAX(reported_at) FROM app_telemetry at2
|
|
WHERE at2.customer_id = ? AND at2.app_name = app_telemetry.app_name
|
|
)`, customerID, customerID)
|
|
if err == nil {
|
|
defer currentRows.Close()
|
|
for currentRows.Next() {
|
|
var appName string
|
|
var currentMB float64
|
|
if err := currentRows.Scan(&appName, ¤tMB); err == nil {
|
|
if s, ok := summaryMap[appName]; ok {
|
|
s.MemoryCurrentMB = currentMB
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
result := make([]CustomerAppSummary, 0, len(order))
|
|
for _, name := range order {
|
|
if s, ok := summaryMap[name]; ok {
|
|
result = append(result, *s)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetAppIssues returns recent known issues for a specific app.
|
|
func (s *Store) GetAppIssues(appName string, limit int) ([]AppIssue, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT id, app_name, fingerprint, severity, message, first_seen, last_seen,
|
|
occurrence_count, affected_customers
|
|
FROM app_log_issues
|
|
WHERE app_name = ?
|
|
ORDER BY last_seen DESC
|
|
LIMIT ?`, appName, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return scanAppIssues(rows)
|
|
}
|
|
|
|
// GetRecentIssuesAllApps returns the most recently seen issues across all apps.
|
|
func (s *Store) GetRecentIssuesAllApps(limit int) ([]AppIssue, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT id, app_name, fingerprint, severity, message, first_seen, last_seen,
|
|
occurrence_count, affected_customers
|
|
FROM app_log_issues
|
|
ORDER BY last_seen DESC
|
|
LIMIT ?`, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return scanAppIssues(rows)
|
|
}
|
|
|
|
func scanAppIssues(rows *sql.Rows) ([]AppIssue, error) {
|
|
var issues []AppIssue
|
|
for rows.Next() {
|
|
var ai AppIssue
|
|
var affectedJSON string
|
|
if err := rows.Scan(&ai.ID, &ai.AppName, &ai.Fingerprint, &ai.Severity, &ai.Message,
|
|
&ai.FirstSeen, &ai.LastSeen, &ai.OccurrenceCount, &affectedJSON); err != nil {
|
|
continue
|
|
}
|
|
json.Unmarshal([]byte(affectedJSON), &ai.AffectedCustomers)
|
|
issues = append(issues, ai)
|
|
}
|
|
return issues, rows.Err()
|
|
}
|
|
|
|
// PruneAppTelemetry removes telemetry rows older than the given time.
|
|
func (s *Store) PruneAppTelemetry(before time.Time) (int64, error) {
|
|
res, err := s.db.Exec("DELETE FROM app_telemetry WHERE reported_at < ?", before)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return res.RowsAffected()
|
|
}
|
|
|
|
// PruneStaleIssues removes issue records not seen since the given time.
|
|
func (s *Store) PruneStaleIssues(notSeenSince time.Time) (int64, error) {
|
|
res, err := s.db.Exec("DELETE FROM app_log_issues WHERE last_seen < ?", notSeenSince)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return res.RowsAffected()
|
|
}
|
|
|
|
// DeleteAppTelemetry removes all telemetry records for a specific app.
|
|
func (s *Store) DeleteAppTelemetry(appName string) (int64, error) {
|
|
res, err := s.db.Exec("DELETE FROM app_telemetry WHERE app_name = ?", appName)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return res.RowsAffected()
|
|
}
|
|
|
|
// DeleteAppIssues removes all known-issue records for a specific app.
|
|
func (s *Store) DeleteAppIssues(appName string) (int64, error) {
|
|
res, err := s.db.Exec("DELETE FROM app_log_issues WHERE app_name = ?", appName)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return res.RowsAffected()
|
|
}
|
|
|
|
// DeleteAppIssuesByIDs removes specific issue records by their IDs.
|
|
func (s *Store) DeleteAppIssuesByIDs(ids []int) (int64, error) {
|
|
if len(ids) == 0 {
|
|
return 0, nil
|
|
}
|
|
placeholders := make([]string, len(ids))
|
|
args := make([]interface{}, len(ids))
|
|
for i, id := range ids {
|
|
placeholders[i] = "?"
|
|
args[i] = id
|
|
}
|
|
query := "DELETE FROM app_log_issues WHERE id IN (" + strings.Join(placeholders, ",") + ")"
|
|
res, err := s.db.Exec(query, args...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return res.RowsAffected()
|
|
}
|