我目前正在尝试使用 Raspberry Pi。我正在运行 Snort,它是数据包检测软件。在 Snort 发出警报的情况下,我想执行一个(Python)脚本。

在树莓派上执行 Snort 如下:

sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf

我创建了一个 python 脚本,当被调用时,它控制树莓派的 GPIO 引脚。将其更多地放在上下文中;当树莓派收到 ping/ICMP 数据包时,红色警报灯会亮起并由同一设备控制。

snort 规则当前有效,当 ICMP 数据包到达时,将向控制台输出警报。但是我不知道如何让 snort 执行 python 脚本

最佳答案

以下是 3 个选项,希望其中一个会起作用:

  • “严格” subprocess 方法使用子进程的 PIPE s
  • 使用 pexpect 的方法——“Pexpect 是一个纯 Python 模块,用于生成子应用程序;控制它们;并响应其输出中的预期模式。” -- 并不是说​​这是您必须与默认 python 安装分开获取的唯一非标准包。
  • 使用伪终端和老式 select 读取文件描述符的方法

  • 每种方法都捆绑在一个 try_[SOME APPROACH] 函数中。您应该能够更新顶部的 3 个参数,然后注释/取消注释底部的一种方法以试一试。

    独立测试两半可能是值得的。换句话说,snort + 我的 rpi.py(如下)。然后,如果可行,我的 timed_printer.py(如下)和你的 python 脚本来切换 RPi GPIO。如果它们都独立工作,那么您可以确信不需要做太多事情就可以使整个工作流程正常运行。

    代码
    import subprocess
    
    _cmd_lst = ['python', '-u', 'timed_printer.py']  # sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
    _rpi_lst = ['python', '-u', 'rpi.py']            # python script that toggles RPi
    _alert = 'TIME'                                  # The keyword you're looking for
                                                     #  in snort output
    
    #===============================================================================
    
    # Simple helper function that calls the RPi toggle script
    def toggle_rpi():
        subprocess.call(_rpi_lst)
    
    
    
    def try_subprocess(cmd_lst, alert, rpi_lst):
        p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=subprocess.PIPE, bufsize=1)
    
        try:
            while True:
                for line in iter(p.stdout.readline, b''):
                    print("try_subprocess() read: %s" % line.strip())
    
                    if alert in line:
                        print("try_subprocess() found alert: %s" % alert)
                        toggle_rpi()
    
        except KeyboardInterrupt:   print(" Caught Ctrl+C -- killing subprocess...")
        except Exception as ex:     print ex
        finally:
            print("Cleaning up...")
            p.kill()
            print("Goodbye.")
    
    
    
    def try_pexpect(cmd_lst, alert, rpi_lst):
        import pexpect # http://pexpect.sourceforge.net/pexpect.html
    
        p = pexpect.spawn(' '.join(cmd_lst))
    
        try:
            while True:
                p.expect(alert)     # This blocks until <alert> is found in the output of cmd_str
                print("try_pexpect() found alert: %s" % alert)
                toggle_rpi()
    
        except KeyboardInterrupt:   print(" Caught Ctrl+C -- killing subprocess...")
        except Exception as ex:     print ex
        finally:
            print("Cleaning up...")
            p.close(force=True)
            print("Goodbye.")
    
    
    
    def try_pty(cmd_lst, alert, rpi_lst, MAX_READ=2048):
        import pty, os, select
    
        mfd, sfd = pty.openpty()
    
        p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=sfd, bufsize=1)
    
        try:
            while True:
                rlist, _, _, = select.select([mfd], [], [])
    
                if rlist:
                    data = os.read(mfd, MAX_READ)
                    print("try_pty() read: %s" % data.strip())
                    if not data:
                        print("try_pty() got EOF -- exiting")
                        break
                    if alert in data:
                        print("try_pty() found alert: %s" % alert)
                        toggle_rpi()
                elif p.poll() is not None:
                    print("try_pty() had subprocess end -- exiting")
                    break
    
        except KeyboardInterrupt:   print(" Caught Ctrl+C -- killing subprocess...")
        except Exception as ex:     print ex
        finally:
            print("Cleaning up...")
            os.close(sfd)
            os.close(mfd)
            p.kill()
            print("Goodbye.")
    
    #===============================================================================
    
    try_subprocess(_cmd_lst, _alert, _rpi_lst)
    #try_pexpect(_cmd_lst, _alert, _rpi_lst)
    #try_pty(_cmd_lst, _alert, _rpi_lst)
    

    测试说明

    为了模拟你的 snort 脚本(一个“挂起”的脚本,然后打印一些东西,然后回到挂起等状态),我写了这个简单的 python 脚本,我称之为 timed_printer.py :
    import time
    while True:
        print("TIME: %s" % time.time())
        time.sleep(5)
    

    我的 rpi.py 文件很简单:
    print("TOGGLING OUTPUT PIN")
    

    这里没有明确的输出刷新,以最好地模拟正常输出。

    最后的考虑

    第一种方法将一次读取整行。所以如果你希望你的 alert 包含在一行中,你会没事的。

    第二种方法 ( pexpect ) 将阻塞,直到遇到 alert

    第三种方法将读取 as soon as data is available ,我应该指出它不一定是完整的一行。如果您看到 try_pty() read: 带有 snort 输出行的片段,导致您错过警报,则需要添加某种缓冲解决方案。

    文档
  • subprocess
  • pexpect
  • pty , select

  • 引用资料: 1 , 2

    关于python - 在 Snort 警报上执行脚本,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28753762/

    10-12 20:31