OSSIM Agent的主要職責是收集網絡上存在的各類設備發送的全部數據,而後按照一種標準方式有序發給OSSIM Server,Agent收集到數據後在發送給Server以前要對這些數據進行歸一化處理,本文主要就如何有序發送數據與如何完成歸一化進行討論。
OSSIM傳感器在經過GET框架實現OSSIM代理和OSSIM服務器之間通訊協議和數據格式的之間轉換。下面咱們先簡要看一下ossim-agent腳本:php
#!/usr/bin/python -OOt import sys sys.path.append('/usr/share/ossim-agent/') sys.path.append('/usr/local/share/ossim-agent/') from ossim_agent.Agent import Agent agent = Agent() agent.main()
這裏須要GET做爲OSSIM代理向OSSIM服務器輸送數據。實現緊密整合所需的兩個主要操做是「生成」(或)OSSIM兼容事件的「映射Mapping」)和此類數據向OSSIM的「傳輸」服務器。它負責此類操做的GET框架的兩個組件是EventHandler和Sender Agent,如圖1所示。
圖1 將Get框架內容集成到OSSIM 前端
Event Handler的主要任務是映射數據源插件採集的事件到SIEM實例警報的OSSIM標準化事件格式。爲了執行這樣的過程,原始消息經歷由RAW LOG轉換爲現有歸一化數據字段格式的一個轉變;在上圖中咱們將這些機制表示爲「歸一化Normalization」和「OSSIM消息」。部分日誌歸一化代碼:python
from Logger import Logger from time import mktime, strptime logger = Logger.logger class Event: EVENT_TYPE = 'event' EVENT_ATTRS = [ "type", "date", "sensor", "interface", "plugin_id", "plugin_sid", "priority", "protocol", "src_ip", "src_port", "dst_ip", "dst_port", "username", "password", "filename", "userdata1", "userdata2", "userdata3", "userdata4", "userdata5", "userdata6", "userdata7", "userdata8", "userdata9", "occurrences", "log", "data", "snort_sid", # snort specific "snort_cid", # snort specific "fdate", "tzone" ] def __init__(self): self.event = {} self.event["event_type"] = self.EVENT_TYPE def __setitem__(self, key, value): if key in self.EVENT_ATTRS: self.event[key] = self.sanitize_value(value) if key == "date": # 以秒爲單位 self.event["fdate"]=self.event[key] try: self.event["date"]=int(mktime(strptime(self.event[key],"%Y-%m-%d %H:%M:%S"))) except: logger.warning("There was an error parsing date (%s)" %\ (self.event[key])) elif key != 'event_type': logger.warning("Bad event attribute: %s" % (key)) def __getitem__(self, key): return self.event.get(key, None) # 事件表示 def __repr__(self): event = self.EVENT_TYPE for attr in self.EVENT_ATTRS: if self[attr]: event += ' %s="%s"' % (attr, self[attr]) return event + "\n" # 返回內部哈希值 def dict(self): return self.event def sanitize_value(self, string): return str(string).strip().replace("\"", "\\\"").replace("'", "") class EventOS(Event): EVENT_TYPE = 'host-os-event' EVENT_ATTRS = [ "host", "os", "sensor", "interface", "date", "plugin_id", "plugin_sid", "occurrences", "log", "fdate", ] class EventMac(Event): EVENT_TYPE = 'host-mac-event' EVENT_ATTRS = [ "host", "mac", "vendor", "sensor", "interface", "date", "plugin_id", "plugin_sid", "occurrences", "log", "fdate", ] class EventService(Event): EVENT_TYPE = 'host-service-event' EVENT_ATTRS = [ "host", "sensor", "interface", "port", "protocol", "service", "application", "date", "plugin_id", "plugin_sid", "occurrences", "log", "fdate", ] class EventHids(Event): EVENT_TYPE = 'host-ids-event' EVENT_ATTRS = [ "host", "hostname", "hids_event_type", "target", "what", "extra_data", "sensor", "date", "plugin_id", "plugin_sid", "username", "password", "filename", "userdata1", "userdata2", "userdata3", "userdata4", "userdata5", "userdata6", "userdata7", "userdata8", "userdata9", "occurrences", "log", "fdate", ] class WatchRule(Event): EVENT_TYPE = 'event' EVENT_ATTRS = [ "type", "date", "fdate", "sensor", "interface", "src_ip", "dst_ip", "protocol", "plugin_id", "plugin_sid", "condition", "value", "port_from", "src_port", "port_to", "dst_port", "interval", "from", "to", "absolute", "log", "userdata1", "userdata2", "userdata3", "userdata4", "userdata5", "userdata6", "userdata7", "userdata8", "userdata9", "filename", "username", ] class Snort(Event): EVENT_TYPE = 'snort-event' EVENT_ATTRS = [ "sensor", "interface", "gzipdata", "unziplen", "event_type", "plugin_id", "type", "occurrences" ]
日誌編碼代碼:正則表達式
import threading, time from Logger import Logger logger = Logger.logger from Output import Output import Config import Event from Threshold import EventConsolidation from Stats import Stats from ConnPro import ServerConnPro class Detector(threading.Thread): def __init__(self, conf, plugin, conn): self._conf = conf self._plugin = plugin self.os_hash = {} self.conn = conn self.consolidation = EventConsolidation(self._conf) logger.info("Starting detector %s (%s).." % \ (self._plugin.get("config", "name"), self._plugin.get("config", "plugin_id"))) threading.Thread.__init__(self) def _event_os_cached(self, event): if isinstance(event, Event.EventOS): import string current_os = string.join(string.split(event["os"]), ' ') previous_os = self.os_hash.get(event["host"], '') if current_os == previous_os: return True else: # 失敗並添加到緩存 self.os_hash[event["host"]] = \ string.join(string.split(event["os"]), ' ') return False def _exclude_event(self, event): if self._plugin.has_option("config", "exclude_sids"): exclude_sids = self._plugin.get("config", "exclude_sids") if event["plugin_sid"] in Config.split_sids(exclude_sids): logger.debug("Excluding event with " +\ "plugin_id=%s and plugin_sid=%s" %\ (event["plugin_id"], event["plugin_sid"])) return True return False def _thresholding(self): self.consolidation.process() def _plugin_defaults(self, event): # 從配置文件中獲取默認參數 if self._conf.has_section("plugin-defaults"): # 1) 日期 default_date_format = self._conf.get("plugin-defaults", "date_format") if event["date"] is None and default_date_format and \ 'date' in event.EVENT_ATTRS: event["date"] = time.strftime(default_date_format, time.localtime(time.time())) # 2) 傳感器 default_sensor = self._conf.get("plugin-defaults", "sensor") if event["sensor"] is None and default_sensor and \ 'sensor' in event.EVENT_ATTRS: event["sensor"] = default_sensor # 3) 網絡接口 default_iface = self._conf.get("plugin-defaults", "interface") if event["interface"] is None and default_iface and \ 'interface' in event.EVENT_ATTRS: event["interface"] = default_iface # 4) 源IP if event["src_ip"] is None and 'src_ip' in event.EVENT_ATTRS: event["src_ip"] = event["sensor"] # 5) 時區 default_tzone = self._conf.get("plugin-defaults", "tzone") if event["tzone"] is None and 'tzone' in event.EVENT_ATTRS: event["tzone"] = default_tzone # 6) sensor,source ip and dest != localhost if event["sensor"] in ('127.0.0.1', '127.0.1.1'): event["sensor"] = default_sensor if event["dst_ip"] in ('127.0.0.1', '127.0.1.1'): event["dst_ip"] = default_sensor if event["src_ip"] in ('127.0.0.1', '127.0.1.1'): event["src_ip"] = default_sensor # 檢測日誌的類型 if event["type"] is None and 'type' in event.EVENT_ATTRS: event["type"] = 'detector' return event def send_message(self, event): if self._event_os_cached(event): return if self._exclude_event(event): return #對於一些空屬性使用默認值。 event = self._plugin_defaults(event) # 合併以前檢查 if self.conn is not None: try: self.conn.send(str(event)) except: id = self._plugin.get("config", "plugin_id") c = ServerConnPro(self._conf, id) self.conn = c.connect(0, 10) try: self.conn.send(str(event)) except: return logger.info(str(event).rstrip()) elif not self.consolidation.insert(event): Output.event(event) Stats.new_event(event) def stop(self): #self.consolidation.clear() pass #在子類中重寫 def process(self): pass def run(self): self.process() class ParserSocket(Detector): def process(self): self.process() class ParserDatabase(Detector): def process(self): self.process()
… …數據庫
從上能夠看出,傳感器的歸一化主要負責對每一個LOG內數據字段進行從新編碼,使其生成一個全新的可能用於發送到OSSIM服務器的完整事件。爲達成這種目的GET框架中包含了一些特定的功能,以便將全部的功能轉換須要BASE64轉換的字段。「OSSIM消息」負責填充GET生成的原始事件中不存在的字段。因此上面講的plugin_id、plugin_sid是用來表示日誌消息來源類型和子類型,這也是生成SIEM事件的必填字段。爲事件格式完整性考慮,有些時候在沒法確認源或目標IP時,系統默認會採用0.0.0.0來填充該字段。apache
注意:這種必填字段咱們可利用phpmyadmin工具查看OSSIM的MySQL數據庫。
Sender Agent負責完成下面兩個任務:
發送由GET收集並由事件格式化的事件發送到OSSIM服務器,這項任務由Event Hander建立的消息組成消息隊列發送到消息中間件實現,時序圖如圖2所示。
圖2 序列圖: 從安全探測器的日誌轉換到OSSIM服務器事件緩存
2)管理GET框架和OSSIM服務器之間的通訊,通訊端口爲TCP 40001經過雙向握手實現。歸一化原始日誌是規範化過程的一個重要環節,OSSIM在歸一化處理日誌的同時保留了原始日誌,可用於日誌歸檔,提供了一種從規範化事件中提取原始日誌的手段。
通過歸一化處理的EVENTS,存儲到MySQL數據庫中,如圖3所示。接着就由關聯引擎根據規則、優先級、可靠性等參數進行交叉關聯分析,得出風險值併發出各類報警提示信息。
圖3 OSSIM平臺日誌存儲機制
接下來咱們再看個實例,下面是一段Apache、CiscoASA以及SSH的原始日誌,如圖四、圖五、圖6所示。安全
event_type=event regexp=((?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(:(?P\d{1,5}))? )?(?P\S+) (?P\S+) (?P\S+) \[(?P\d{2}\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2})\s+[+-]\d{4}\] \"(?P[^\"]*)\" (?P\d{3}) ((?P\d+)|-)( \"(?P[^\"]*)\" \"(?P[^\"]*)\")?$ src_ip={resolv($src)} dst_ip={resolv($dst)} dst_port={$port} date={normalize_date($date)} plugin_sid={$code} username={$user} userdata1={$request} userdata2={$size} userdata3={$referer_uri} userdata4={$useragent} filename={$id}
[0002 - apache-error] 錯誤日誌服務器
event_type=event regexp=\[(?P\w{3} \w{3} \d{2} \d{2}:\d{2}:\d{2} \d{4})\] \[(?P(emerg|alert|crit|error|warn|notice|info|debug))\] (\[client (?P\S+)\] )?(?P.*) date={normalize_date($date)} plugin_sid={translate($type)} src_ip={resolv($src)} userdata1={$data}
圖4 Apache原始日誌
圖5 一條Cisco ASA 原始日誌網絡
圖6 Cisco ASA 事件分類
經過過OSSIM歸一化處理後的實際再經過Web前端展示給你們方便閱讀的格式。歸一化處理後的事件和原始日誌的對比方法咱們在《開源安全運維平臺OSSIM疑難解析:入門篇》一書中還會講解。而在圖7所示的例子當中,僅使用了Userdata1和Userdata2,並無用到Userdata3~Userdata9這些是擴展位,主要是爲了預留給其餘設備或服務使用,這裏目標地址會標記成IP地址的形式,例如:Host192.168.11.160。實際上歸一化處理這種操做發生在系統採集和存儲事件以後,關聯和數據分析以前,在SIEM工具中把採集過程當中把數據轉換成易讀懂的格式,採用格式化的數據,能更容易理解。