画的博客
页面 首页 主题 在线查询 分类 默认 接口 技术 逆向 后台 登录
发布于2025年08月16日2 条评论

Python 写一个sshd 日志分析模块

  • Ubuntu/Debian:/var/log/auth.log
    记录用户登录、注销、权限变更等信息,包括 SSH 登录失败记录。
  • 可通过以下命令方式查看:

    1. ​​查看完整日志​​
    sudo cat /var/log/auth.log | grep sshd  # 过滤出SSH相关日志
  1. ​​实时监控日志​​

    sudo tail -f /var/log/auth.log | grep sshd  # 实时输出最新SSH日志
  2. 分析登录失败记录​​

    sudo grep "Failed password" /var/log/auth.log  # 提取登录失败信息

python 模板

import re
from collections import defaultdict
from datetime import datetime

# 日志文件路径(根据实际情况调整)
LOG_FILE = 'auth.log'

def parse_log_line(line):
    """
    解析日志行,提取日期和相关信息
    """
    # 匹配日期部分 (Aug 10 00:05:01)
    date_match = re.match(r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', line)
    if date_match:
        date_str = date_match.group(1)
        # 添加年份以便解析
        date_str_with_year = f"{datetime.now().year} {date_str}"
        try:
            date_obj = datetime.strptime(date_str_with_year, '%Y %b %d %H:%M:%S')
            return date_obj, line
        except ValueError:
            pass
    return None, line

def analyze_ssh_log():
    """
    分析SSH登录日志,统计成功和失败的登录尝试
    """
    # 存储每天的统计数据
    daily_stats = defaultdict(lambda: {
        'successful_logins': 0,
        'failed_logins': 0,
        'failed_ips': defaultdict(int),
        'successful_ips': defaultdict(int),
        'failed_users': defaultdict(int)
    })
    
    # 存储总统计数据
    total_stats = {
        'successful_logins': 0,
        'failed_logins': 0,
        'unique_ips': set(),
        'unique_failed_ips': set()
    }
    
    try:
        with open(LOG_FILE, 'r', encoding='utf-8', errors='ignore') as f:
            for line in f:
                date_obj, log_line = parse_log_line(line.strip())
                if not date_obj:
                    continue
                
                date_key = date_obj.strftime('%Y-%m-%d')
                
                # 检查登录成功的模式
                # 1. 公钥认证成功
                if 'Accepted publickey' in log_line:
                    total_stats['successful_logins'] += 1
                    daily_stats[date_key]['successful_logins'] += 1
                    
                    # 提取IP地址
                    ip_match = re.search(r'from ([\d\.]+) port', log_line)
                    if ip_match:
                        ip = ip_match.group(1)
                        daily_stats[date_key]['successful_ips'][ip] += 1
                        total_stats['unique_ips'].add(ip)
                
                # 2. session opened for user (非CRON)
                elif 'sshd:session' in log_line and 'session opened for user' in log_line:
                    total_stats['successful_logins'] += 1
                    daily_stats[date_key]['successful_logins'] += 1
                    
                    # 提取IP地址
                    ip_match = re.search(r'from ([\d\.]+) port', log_line)
                    if ip_match:
                        ip = ip_match.group(1)
                        daily_stats[date_key]['successful_ips'][ip] += 1
                        total_stats['unique_ips'].add(ip)
                
                # 检查登录失败的模式
                elif 'Failed password' in log_line:
                    total_stats['failed_logins'] += 1
                    daily_stats[date_key]['failed_logins'] += 1
                    
                    # 提取IP地址
                    ip_match = re.search(r'from ([\d\.]+) port', log_line)
                    if ip_match:
                        ip = ip_match.group(1)
                        daily_stats[date_key]['failed_ips'][ip] += 1
                        total_stats['unique_failed_ips'].add(ip)
                    
                    # 提取用户名
                    user_match = re.search(r'Failed password for (?:invalid user )?(\S+) from', log_line)
                    if user_match:
                        user = user_match.group(1)
                        daily_stats[date_key]['failed_users'][user] += 1
                
                # 检查无效用户登录尝试
                elif 'Invalid user' in log_line:
                    total_stats['failed_logins'] += 1
                    daily_stats[date_key]['failed_logins'] += 1
                    
                    # 提取IP地址
                    ip_match = re.search(r'from ([\d\.]+) port', log_line)
                    if ip_match:
                        ip = ip_match.group(1)
                        daily_stats[date_key]['failed_ips'][ip] += 1
                        total_stats['unique_failed_ips'].add(ip)
                        
    except FileNotFoundError:
        print(f"错误: 找不到日志文件 {LOG_FILE}")
        return
    except Exception as e:
        print(f"处理日志文件时出错: {e}")
        return
    
    # 输出统计结果
    print("=" * 70)
    print("SSH 登录统计分析报告")
    print("=" * 70)
    print(f"总成功登录次数: {total_stats['successful_logins']}")
    print(f"总失败登录次数: {total_stats['failed_logins']}")
    print(f"唯一成功登录IP数: {len(total_stats['unique_ips'])}")
    print(f"唯一失败登录IP数: {len(total_stats['unique_failed_ips'])}")
    print()
    
    # 按日期排序输出每日统计
    print("每日登录统计:")
    print("-" * 70)
    print(f"{'日期':<12} {'成功登录':<10} {'失败登录':<10} {'成功率':<10} {'失败率':<10}")
    print("-" * 70)
    
    sorted_dates = sorted(daily_stats.keys())
    for date in sorted_dates:
        stats = daily_stats[date]
        total_attempts = stats['successful_logins'] + stats['failed_logins']
        success_rate = (stats['successful_logins'] / total_attempts * 100) if total_attempts > 0 else 0
        failure_rate = (stats['failed_logins'] / total_attempts * 100) if total_attempts > 0 else 0
        
        print(f"{date:<12} {stats['successful_logins']:<10} {stats['failed_logins']:<10} "
              f"{success_rate:<9.1f}% {failure_rate:<9.1f}%")
    
    # 输出最近几天的详细信息
    print("\n最近几天的详细信息:")
    print("-" * 70)
    for date in sorted(daily_stats.keys(), reverse=True)[:5]:  # 显示最近5天
        stats = daily_stats[date]
        print(f"\n{date} 详细信息:")
        print(f"  成功登录: {stats['successful_logins']}")
        print(f"  失败登录: {stats['failed_logins']}")
        
        if stats['failed_users']:
            print("  最常见的失败登录用户:")
            sorted_users = sorted(stats['failed_users'].items(), key=lambda x: x[1], reverse=True)[:5]
            for user, count in sorted_users:
                print(f"    {user}: {count} 次")
        
        if stats['failed_ips']:
            print("  最常见的失败登录IP:")
            sorted_ips = sorted(stats['failed_ips'].items(), key=lambda x: x[1], reverse=True)[:5]
            for ip, count in sorted_ips:
                print(f"    {ip}: {count} 次")
        
        if stats['successful_ips']:
            print("  成功登录IP:")
            sorted_succ_ips = sorted(stats['successful_ips'].items(), key=lambda x: x[1], reverse=True)[:5]
            for ip, count in sorted_succ_ips:
                print(f"    {ip}: {count} 次")
    
    # 输出安全建议
    print("\n安全建议:")
    print("-" * 70)
    if total_stats['failed_logins'] > total_stats['successful_logins'] * 2:
        print("  ⚠️  失败登录次数远超成功登录次数,可能正在遭受暴力破解攻击")
    
    all_failed_ips = set()
    for date in daily_stats:
        all_failed_ips.update(daily_stats[date]['failed_ips'].keys())
    
    if len(all_failed_ips) > 50:
        print("  ⚠️  检测到来自大量不同IP的登录尝试,可能正在遭受分布式攻击")
    
    # 检查是否有特别频繁的IP
    ip_counts = defaultdict(int)
    for date in daily_stats:
        for ip, count in daily_stats[date]['failed_ips'].items():
            ip_counts[ip] += count
    
    frequent_ips = [ip for ip, count in ip_counts.items() if count > 100]
    if frequent_ips:
        print(f"  ⚠️  检测到来自以下IP的频繁登录尝试 (>100次): {', '.join(frequent_ips)}")

if __name__ == "__main__":
    analyze_ssh_log()

标签: Linux, python

已有 2 条评论

  1. 可莉

    西风骑士团「火花骑士」,可莉,前来报到。呃,后面该说什么词来着?可莉背不下来啦。

  2. 可莉

    早啊,带可莉出去玩吧,我们一起来冒险。

添加新评论

提交评论