E11 — Detecção de Anomalias e Análise Inteligente de Logs com IA

E11 — Detecção de Anomalias e Análise Inteligente de Logs com IA

Extensão C · AIOps e IA Aplicada a DevOps · Artigo E11 de E12 Prof. Ricardo Matos — Dominando DevOps & Cloud em 1 Ano


O Problema do Volume

Uma aplicação moderna em produção gera dezenas de milhares de linhas de log por minuto. Um cluster Kubernetes com dez serviços pode gerar gigabytes de logs por hora. Ler logs manualmente durante um incidente é como procurar uma agulha em um palheiro que cresce enquanto você procura.

A análise tradicional de logs funciona assim: o engenheiro sabe o que procurar, escreve uma query no Kibana ou CloudWatch Insights, filtra por nível de erro, e percorre os resultados. Isso pressupõe que o engenheiro já tem uma hipótese — o que é frequentemente falso nos primeiros minutos de um incidente.

A análise inteligente de logs com ML inverte a lógica: em vez de procurar por padrões conhecidos, o sistema identifica o que é incomum em relação ao comportamento histórico. Novos padrões de erro que nunca apareceram. Mensagens que passaram a aparecer com frequência anormalmente alta. Correlações entre eventos em serviços diferentes que nunca co-ocorreram antes.

Detecção de Anomalias com Prometheus e Python

Para times que já têm Prometheus como sistema de métricas, é possível construir detecção de anomalias sem ferramentas adicionais usando a API HTTP do Prometheus e análise em Python. O exemplo a seguir implementa um detector simples baseado em Z-score:

#!/usr/bin/env python3
# detector_anomalias.py
# Detecta anomalias em métricas do Prometheus usando Z-score

import requests
import numpy as np
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
import json
import os

PROMETHEUS_URL = os.getenv('PROMETHEUS_URL', 'http://localhost:9090')
SLACK_WEBHOOK = os.getenv('SLACK_WEBHOOK_URL')
ZSCORE_THRESHOLD = 3.0  # Desvios padrão para considerar anomalia

@dataclass
class Anomalia:
    metrica: str
    labels: dict
    valor_atual: float
    media_historica: float
    desvio_padrao: float
    zscore: float
    timestamp: datetime

def buscar_historico(query: str, horas: int = 24) -> list[tuple[float, float]]:
    """Busca o histórico de uma métrica no Prometheus."""
    fim = datetime.utcnow()
    inicio = fim - timedelta(hours=horas)

    response = requests.get(
        f'{PROMETHEUS_URL}/api/v1/query_range',
        params={
            'query': query,
            'start': inicio.timestamp(),
            'end': fim.timestamp(),
            'step': '5m'
        },
        timeout=30
    )
    response.raise_for_status()
    data = response.json()

    if data['status'] != 'success' or not data['data']['result']:
        return []

    # Retorna apenas os valores (timestamp, valor)
    return [(float(ts), float(val))
            for ts, val in data['data']['result'][0]['values']]

def detectar_anomalia(
    query: str,
    nome_metrica: str,
    janela_historico_horas: int = 168  # 7 dias
) -> Optional[Anomalia]:
    """
    Detecta se o valor atual de uma métrica é anômalo
    comparado com o histórico recente.
    """
    # Buscar histórico completo
    historico = buscar_historico(query, janela_historico_horas)
    if len(historico) < 10:
        return None  # Histórico insuficiente

    valores = np.array([v for _, v in historico])
    valor_atual = valores[-1]

    # Excluir o valor atual do cálculo de baseline
    baseline = valores[:-1]
    media = np.mean(baseline)
    desvio = np.std(baseline)

    if desvio == 0:
        return None  # Métrica constante — sem anomalia possível

    zscore = abs((valor_atual - media) / desvio)

    if zscore >= ZSCORE_THRESHOLD:
        return Anomalia(
            metrica=nome_metrica,
            labels={},
            valor_atual=valor_atual,
            media_historica=media,
            desvio_padrao=desvio,
            zscore=zscore,
            timestamp=datetime.utcnow()
        )

    return None

def notificar_slack(anomalia: Anomalia):
    """Envia notificação de anomalia para o Slack."""
    if not SLACK_WEBHOOK:
        print(f"ANOMALIA: {anomalia}")
        return

    direcao = "↑" if anomalia.valor_atual > anomalia.media_historica else "↓"
    variacao_pct = abs(
        (anomalia.valor_atual - anomalia.media_historica)
        / anomalia.media_historica * 100
    )

    payload = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"⚠️ Anomalia Detectada: {anomalia.metrica}"
                }
            },
            {
                "type": "section",
                "fields": [
                    {
                        "type": "mrkdwn",
                        "text": f"*Valor atual:*\n{anomalia.valor_atual:.2f} {direcao}"
                    },
                    {
                        "type": "mrkdwn",
                        "text": f"*Média histórica:*\n{anomalia.media_historica:.2f}"
                    },
                    {
                        "type": "mrkdwn",
                        "text": f"*Desvio (Z-score):*\n{anomalia.zscore:.1f}σ"
                    },
                    {
                        "type": "mrkdwn",
                        "text": f"*Variação:*\n{variacao_pct:.0f}% acima/abaixo do normal"
                    }
                ]
            }
        ]
    }

    requests.post(SLACK_WEBHOOK, json=payload, timeout=10)

def executar_verificacoes():
    """Executa todas as verificações de anomalia configuradas."""
    verificacoes = [
        (
            'sum(rate(http_requests_total{status=~"5..",namespace="producao"}[5m])) / '
            'sum(rate(http_requests_total{namespace="producao"}[5m])) * 100',
            'Taxa de Erro HTTP (%)'
        ),
        (
            'histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket'
            '{namespace="producao"}[5m])) by (le))',
            'Latência p99 (segundos)'
        ),
        (
            'sum(rate(loja_pedidos_criados_total[5m])) * 60',
            'Pedidos por Minuto'
        ),
        (
            'sum(rate(loja_checkouts_concluidos_total[30m])) / '
            'sum(rate(loja_checkouts_iniciados_total[30m])) * 100',
            'Taxa de Conversão (%)'
        ),
    ]

    anomalias_detectadas = []
    for query, nome in verificacoes:
        anomalia = detectar_anomalia(query, nome)
        if anomalia:
            anomalias_detectadas.append(anomalia)
            notificar_slack(anomalia)

    print(f"Verificação concluída: {len(anomalias_detectadas)} anomalia(s) detectada(s)")
    return anomalias_detectadas

if __name__ == '__main__':
    executar_verificacoes()

Análise de Logs com LLM Durante Incidentes

O uso mais imediato de LLMs em operações é a análise de logs durante incidentes. O processo é simples: coletar os logs relevantes do período do incidente, enviá-los ao modelo com contexto sobre o sistema, e perguntar o que está errado.

O desafio é o volume. Logs de 30 minutos de um sistema em produção podem ter milhares de linhas — mais do que cabe no contexto de um LLM. A solução é filtrar e sumarizar antes de enviar:

#!/usr/bin/env python3
# analise_incidente.py
# Analisa logs de um incidente usando LLM

import anthropic
import subprocess
import sys
from datetime import datetime, timedelta

def coletar_logs_kubernetes(
    namespace: str,
    inicio: datetime,
    fim: datetime,
    max_linhas: int = 500
) -> dict[str, str]:
    """Coleta logs dos pods em um namespace no período do incidente."""
    logs = {}

    # Listar pods no namespace
    resultado = subprocess.run(
        ['kubectl', 'get', 'pods', '-n', namespace, '-o', 'name'],
        capture_output=True, text=True
    )
    pods = resultado.stdout.strip().split('\n')

    for pod in pods:
        pod_nome = pod.replace('pod/', '')
        resultado = subprocess.run(
            [
                'kubectl', 'logs', pod_nome,
                '-n', namespace,
                '--since-time', inicio.isoformat() + 'Z',
                '--tail', str(max_linhas)
            ],
            capture_output=True, text=True
        )

        if resultado.stdout:
            # Filtrar apenas linhas de erro e warning para reduzir volume
            linhas_relevantes = [
                linha for linha in resultado.stdout.split('\n')
                if any(nivel in linha.upper()
                       for nivel in ['ERROR', 'WARN', 'FATAL', 'EXCEPTION', 'PANIC'])
            ]
            if linhas_relevantes:
                logs[pod_nome] = '\n'.join(linhas_relevantes[:100])

    return logs

def analisar_incidente_com_llm(
    logs: dict[str, str],
    descricao_incidente: str,
    arquitetura_sistema: str
) -> str:
    """Envia logs para análise pelo LLM e retorna o diagnóstico."""
    cliente = anthropic.Anthropic()

    # Formatar os logs para o contexto
    logs_formatados = ""
    for servico, conteudo in logs.items():
        if conteudo.strip():
            logs_formatados += f"\n=== {servico} ===\n{conteudo}\n"

    prompt = f"""Você é um engenheiro sênior de SRE analisando um incidente de produção.

DESCRIÇÃO DO INCIDENTE:
{descricao_incidente}

ARQUITETURA DO SISTEMA:
{arquitetura_sistema}

LOGS COLETADOS (apenas erros e warnings do período do incidente):
{logs_formatados[:8000]}  # Limitar para não exceder o contexto

Analise os logs e forneça:

1. **Causa raiz provável**: O que causou o incidente, com evidências dos logs.

2. **Linha do tempo**: Reconstrua a sequência de eventos com base nos timestamps.

3. **Serviços afetados**: Quais serviços foram impactados e em que ordem.

4. **Ações imediatas recomendadas**: O que fazer agora para resolver ou mitigar.

5. **Prevenção**: O que pode ser feito para evitar que isso se repita.

Seja específico e referencie as mensagens de log relevantes quando possível.
Responda em português."""

    mensagem = cliente.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2000,
        messages=[{"role": "user", "content": prompt}]
    )

    return mensagem.content[0].text

def main():
    # Exemplo de uso durante um incidente real
    NAMESPACE = "producao"
    INICIO_INCIDENTE = datetime.utcnow() - timedelta(minutes=30)
    FIM_INCIDENTE = datetime.utcnow()

    DESCRICAO = """
    Às 14:23 os alertas começaram a disparar: queda de 60% nos pedidos por minuto
    e aumento da taxa de erro HTTP para 15%. O time foi acionado às 14:25.
    O deploy mais recente foi às 14:20 (versão v2.3.1 do order-service).
    """

    ARQUITETURA = """
    Sistema de e-commerce com microsserviços:
    - api-gateway: recebe requisições e roteia para os serviços
    - catalog-service: gerencia produtos e estoque (PostgreSQL + Redis cache)
    - order-service: cria e gerencia pedidos (PostgreSQL, publica no SQS)
    - notification-service: consome SQS e envia emails/SMS
    Todos rodando no Kubernetes (EKS) com HPA configurado.
    """

    print("Coletando logs do período do incidente...")
    logs = coletar_logs_kubernetes(NAMESPACE, INICIO_INCIDENTE, FIM_INCIDENTE)
    print(f"Logs coletados de {len(logs)} serviço(s)")

    if not logs:
        print("Nenhum log de erro encontrado no período")
        sys.exit(0)

    print("\nAnalisando com LLM...")
    diagnostico = analisar_incidente_com_llm(logs, DESCRICAO, ARQUITETURA)

    print("\n" + "="*60)
    print("DIAGNÓSTICO DO INCIDENTE")
    print("="*60)
    print(diagnostico)

    # Salvar o diagnóstico para o postmortem
    with open(f'diagnostico-{datetime.now().strftime("%Y%m%d-%H%M")}.md', 'w') as f:
        f.write(f"# Diagnóstico do Incidente — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n")
        f.write(f"## Descrição\n{DESCRICAO}\n\n")
        f.write(f"## Análise\n{diagnostico}\n")

if __name__ == '__main__':
    main()

Classificação Automática de Alertas

Outro caso de uso prático: usar um LLM para classificar e priorizar alertas que chegam em volume, reduzindo o ruído antes de acordar alguém:

# classificador_alertas.py
# Recebe alertas do Alertmanager via webhook e classifica com LLM

from flask import Flask, request, jsonify
import anthropic
import json

app = Flask(__name__)
cliente = anthropic.Anthropic()

RUNBOOKS_BASE_URL = "https://wiki.empresa.com/runbooks"

def classificar_alerta(alerta: dict) -> dict:
    """Classifica um alerta e sugere ação usando LLM."""

    nome = alerta.get('labels', {}).get('alertname', 'Desconhecido')
    severidade = alerta.get('labels', {}).get('severity', 'unknown')
    descricao = alerta.get('annotations', {}).get('description', '')
    sumario = alerta.get('annotations', {}).get('summary', '')

    prompt = f"""Você é um sistema de triagem de alertas de infraestrutura.

ALERTA RECEBIDO:
- Nome: {nome}
- Severidade declarada: {severidade}
- Resumo: {sumario}
- Descrição: {descricao}

Com base nas informações acima, forneça uma análise em JSON com exatamente estes campos:
{{
  "urgencia": "imediata|alta|media|baixa",
  "acordar_plantao": true/false,
  "provavel_causa": "descrição em uma frase",
  "primeira_acao": "o que fazer primeiro",
  "pode_ser_falso_positivo": true/false,
  "razao_falso_positivo": "se aplicável, por quê poderia ser falso positivo"
}}

Responda APENAS com o JSON, sem texto adicional."""

    resposta = cliente.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=400,
        messages=[{"role": "user", "content": prompt}]
    )

    try:
        classificacao = json.loads(resposta.content[0].text)
    except json.JSONDecodeError:
        classificacao = {
            "urgencia": "alta",
            "acordar_plantao": True,
            "provavel_causa": "Erro na classificação automática",
            "primeira_acao": "Investigar manualmente",
            "pode_ser_falso_positivo": False,
            "razao_falso_positivo": ""
        }

    return classificacao

@app.route('/webhook/alertmanager', methods=['POST'])
def receber_alertas():
    """Endpoint que recebe alertas do Alertmanager."""
    payload = request.json
    alertas = payload.get('alerts', [])
    resultados = []

    for alerta in alertas:
        if alerta.get('status') != 'firing':
            continue

        classificacao = classificar_alerta(alerta)
        nome = alerta['labels'].get('alertname', 'N/A')

        # Logar para auditoria
        print(f"Alerta: {nome} | Urgência: {classificacao['urgencia']} | "
              f"Acordar plantão: {classificacao['acordar_plantao']}")

        resultados.append({
            'alerta': nome,
            'classificacao': classificacao
        })

        # Notificar apenas se necessário acordar o plantão
        if classificacao['acordar_plantao']:
            enviar_para_pagerduty(alerta, classificacao)
        else:
            # Apenas registrar no canal de alertas sem notificação urgente
            enviar_para_slack_informativo(alerta, classificacao)

    return jsonify({'processados': len(resultados), 'resultados': resultados})

def enviar_para_pagerduty(alerta, classificacao):
    # Implementação da integração com PagerDuty
    pass

def enviar_para_slack_informativo(alerta, classificacao):
    # Implementação da notificação Slack sem urgência
    pass

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Referências para Aprofundamento

— Prometheus HTTP API: https://prometheus.io/docs/prometheus/latest/querying/api/ — Anthropic API — Messages: https://docs.anthropic.com/en/api/messages — Elastic ML — Anomaly Detection: https://www.elastic.co/guide/en/machine-learning/current/ml-ad-overview.html — AWS CloudWatch Anomaly Detection: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Anomaly_Detection.html