diff --git a/filter-plugin/logstash-filter-guardium-universal/ARCHITECTURE.md b/filter-plugin/logstash-filter-guardium-universal/ARCHITECTURE.md new file mode 100644 index 000000000..d34875df5 --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/ARCHITECTURE.md @@ -0,0 +1,170 @@ +# Architectural Proposal: Universal Guardium Filter Plugin + +> **This is a suggestion to the project maintainers.** +> The reference implementation here is meant to illustrate the idea concretely, +> not to be merged as-is. Feedback and alternative approaches are very welcome. + +--- + +## The Problem + +Every filter plugin in this repository follows the same structure. +Opening any two plugins side-by-side reveals that they are nearly identical — the only +meaningful difference is the 50–150 lines of parsing logic specific to each datasource. + +Everything else is copy-pasted boilerplate: + +``` +@LogstashPlugin annotation ┐ +implements Filter │ +static Log4j init block │ ~200 lines repeated verbatim +filter() event loop + try/catch │ in every single plugin +GSON serialization │ +correctIPs() utility │ +logEvent() utility │ +configSchema() / getId() ┘ +``` + +This creates real maintenance costs: +- A security fix or utility improvement must be applied to **54 files** +- Adding a new datasource means scaffolding a full Logstash plugin (~500 lines, 8+ files, a new gem) +- 54 separate gem artifacts to build, test, version, and ship + +--- + +## The Suggestion + +> **Replace all 54 individual filter plugins with a single generic plugin, +> where each datasource is just a thin parser class (or ideally just a config file).** + +The Logstash plugin layer should exist exactly once. The only thing that varies between +datasources — the parsing logic — should be expressed in the simplest possible form. + +--- + +## Proposed Architecture + +### Current state (54 plugins) + +``` +┌───────────────────────────────────────────────────────────────┐ +│ logstash-filter-mysql-guardium/ │ +│ ├── build.gradle (190 lines, ~identical across all) │ +│ ├── MySqlFilterGuardium.java │ +│ │ ├── @LogstashPlugin, implements Filter ┐ boilerplate │ +│ │ ├── Log4j init, GSON, correctIPs() │ ~200 lines │ +│ │ ├── filter() loop, error tagging ┘ │ +│ │ └── parseRecord() ← the only unique part │ +│ └── filter.conf: mysql_filter_guardium {} │ +│ │ +│ logstash-filter-mongodb-guardium/ (same structure) │ +│ logstash-filter-snowflake-guardium/ (same structure) │ +│ logstash-filter-postgres-guardium/ (same structure) │ +│ ... × 54 │ +└───────────────────────────────────────────────────────────────┘ + +54 gems · 54 build files · 54 copies of the same boilerplate +``` + +### Proposed state (1 plugin + thin parsers) + +``` +┌───────────────────────────────────────────────────────────────┐ +│ logstash-filter-guardium-universal/ (ONE gem) │ +│ │ │ +│ ├── GuardiumUniversalFilter.java ← all Logstash boilerplate │ +│ │ └── delegates to ──────────────────────────────────┐ │ +│ │ │ │ +│ ├── IGuardiumParser (interface) │ │ +│ │ └── parseRecord(Event) → Record │ │ +│ │ │ │ +│ ├── AbstractGuardiumParser │ │ +│ │ └── correctIPs(), shared utilities │ │ +│ │ │ │ +│ ├── ParserRegistry ←────────────────────────────────── ┘ │ +│ │ ├── "mysql" → MySqlParser (~150 lines) │ +│ │ ├── "mongodb" → MongoDbParser (~ 60 lines) │ +│ │ ├── "snowflake" → SnowflakeParser (~ 40 lines) │ +│ │ └── ... (one line per datasource) │ +│ │ │ +│ └── filter.conf: │ +│ guardium_universal_filter { datasource => "mysql" } │ +└───────────────────────────────────────────────────────────────┘ + +1 gem · 1 build file · boilerplate written once +``` + +--- + +## What Changes for Each Datasource + +### `filter.conf` — minimal change + +```diff +- mysql_filter_guardium {} ++ guardium_universal_filter { datasource => "mysql" } +``` + +### Adding a new datasource — before vs. after + +| | Before | After | +|---|---|---| +| Files to create | 8+ (plugin class, build.gradle, VERSION, gemspec, ...) | 1 (parser class) | +| Lines of new code | ~500 | ~100 | +| New gem required | Yes | No | +| Boilerplate to copy | ~200 lines | 0 lines | + +--- + +## Reference Implementation + +This PR includes a working reference implementation to make the idea concrete: + +``` +filter-plugin/logstash-filter-guardium-universal/ +├── GuardiumUniversalFilter.java ← the single Logstash plugin +├── parser/ +│ ├── IGuardiumParser.java ← interface: parseRecord(Event) → Record +│ ├── AbstractGuardiumParser.java ← shared utilities +│ └── ParserRegistry.java ← datasource name → parser instance +├── datasources/ +│ ├── mysql/MySqlParser.java ← MySQL fully migrated (~150 lines) +│ ├── mongodb/MongoDbParser.java ← MongoDB thin connector +│ └── snowflake/SnowflakeParser.java← Snowflake thin connector +└── [MySQL|MongoDB|Snowflake]*Package/filter.conf +``` + +**MySQL is fully migrated** as a concrete example — its parsing logic is identical to the +original, just extracted into a plain Java class with no Logstash dependency. +MongoDB and Snowflake are included as thin connectors to show how complex, multi-class +parser hierarchies integrate cleanly. + +--- + +## Migration Strategy + +The migration can be done incrementally with zero disruption: + +``` +Phase 1 Framework + 3 reference parsers (this PR) +Phase 2 Migrate remaining 51 parsers one by one (mechanical extraction) +Phase 3 Move parser class hierarchies fully into the new plugin +Phase 4 Deprecate individual filter plugin directories +``` + +Existing pipelines are unaffected until their `filter.conf` is updated. +Both the old and new plugin can coexist during migration. + +--- + +## Questions for the Team + +- Is this direction aligned with the project's goals? +- Should `IGuardiumParser` live in the `common` module instead, to allow + parser JARs to be developed and deployed independently? +- Should parsers eventually be driven by config files (YAML field mappings) + for simple datasources, with Java only needed for complex ones? + +--- + +> Raised by [@haimofergmail](https://github.com/haimofergmail) — open to all feedback. diff --git a/filter-plugin/logstash-filter-guardium-universal/MongoDBOverSyslogPackage/MongoDB/filter.conf b/filter-plugin/logstash-filter-guardium-universal/MongoDBOverSyslogPackage/MongoDB/filter.conf new file mode 100644 index 000000000..f45d3581b --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/MongoDBOverSyslogPackage/MongoDB/filter.conf @@ -0,0 +1,19 @@ +# MongoDB audit logs via syslog — uses the universal filter plugin +filter { + if [type] == "syslog-mongodb" { + grok { + match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:server_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } + } + + if "_grokparsefailure" in [tags] { drop {} } + + mutate { + rename => { "host" => "server_ip" } + } + + # ---- Universal filter (replaces mongodb_guardium_filter{}) --------------- + guardium_universal_filter { + datasource => "mongodb" + } + } +} diff --git a/filter-plugin/logstash-filter-guardium-universal/MySQLOverSyslogPackage/MySQL/filter.conf b/filter-plugin/logstash-filter-guardium-universal/MySQLOverSyslogPackage/MySQL/filter.conf new file mode 100644 index 000000000..a0306b73e --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/MySQLOverSyslogPackage/MySQL/filter.conf @@ -0,0 +1,31 @@ +# MySQL audit logs via syslog — uses the universal filter plugin +# Previously required a dedicated logstash-filter-mysql-guardium plugin. +filter { + if [type] == "syslog-mysql" { + grok { + match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:server_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } + } + + if "_grokparsefailure" in [tags] { drop {} } + + date { + match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] + } + + mutate { + rename => { "host" => "server_ip" } + } + + # ---- Universal filter (replaces mysql_filter_guardium{}) ---------------- + guardium_universal_filter { + datasource => "mysql" + } + + if "_guardium_parse_error_mysql" not in [tags] { + mutate { + remove_field => ["message", "syslog_timestamp", "syslog_program", + "syslog_pid", "syslog_message", "type"] + } + } + } +} diff --git a/filter-plugin/logstash-filter-guardium-universal/SnowflakeOverJdbcPackage/Snowflake/filter.conf b/filter-plugin/logstash-filter-guardium-universal/SnowflakeOverJdbcPackage/Snowflake/filter.conf new file mode 100644 index 000000000..91e980dcf --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/SnowflakeOverJdbcPackage/Snowflake/filter.conf @@ -0,0 +1,7 @@ +# Snowflake audit logs via JDBC input — uses the universal filter plugin +filter { + # ---- Universal filter (replaces guardium_snowflake_filter{}) --------------- + guardium_universal_filter { + datasource => "snowflake" + } +} diff --git a/filter-plugin/logstash-filter-guardium-universal/VERSION b/filter-plugin/logstash-filter-guardium-universal/VERSION new file mode 100644 index 000000000..afaf360d3 --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/VERSION @@ -0,0 +1 @@ +1.0.0 \ No newline at end of file diff --git a/filter-plugin/logstash-filter-guardium-universal/build.gradle b/filter-plugin/logstash-filter-guardium-universal/build.gradle new file mode 100644 index 000000000..897b5481c --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/build.gradle @@ -0,0 +1,190 @@ +import java.nio.file.Files +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING + +apply plugin: 'java' +apply from: LOGSTASH_CORE_PATH + "/../rubyUtils.gradle" + +// =========================================================================== +// plugin info +// =========================================================================== +group 'com.ibm.guardium.universal' +version "${file("VERSION").text.trim()}" +description = "Universal Guardium filter — one plugin for all datasources" +pluginInfo.licenses = ['Apache-2.0'] +pluginInfo.longDescription = "Single Logstash filter plugin that replaces all individual " + + "logstash-filter-*-guardium plugins. Datasource is selected at runtime via the " + + "'datasource' configuration parameter." +pluginInfo.authors = ['IBM'] +pluginInfo.email = [''] +pluginInfo.homepage = "https://github.com/IBM/universal-connectors" +pluginInfo.pluginType = "filter" +pluginInfo.pluginClass = "GuardiumUniversalFilter" +pluginInfo.pluginName = "guardium_universal_filter" +// =========================================================================== + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +def jacocoVersion = '0.8.4' +def minimumCoverageStr = System.getenv("MINIMUM_COVERAGE") ?: "50.0%" +if (minimumCoverageStr.endsWith("%")) { + minimumCoverageStr = minimumCoverageStr.substring(0, minimumCoverageStr.length() - 1) +} +def minimumCoverage = Float.valueOf(minimumCoverageStr) / 100 + +buildscript { + repositories { + maven { url "https://plugins.gradle.org/m2/" } + mavenCentral() + jcenter() + } + dependencies { + classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.4' + classpath "org.barfuin.gradle.jacocolog:gradle-jacoco-log:3.0.0-RC2" + classpath group: 'org.yaml', name: 'snakeyaml', version: '2.2' + } +} + +def universalConnectorsDir = project.projectDir.parentFile?.parentFile.toString() +def versions = new org.yaml.snakeyaml.Yaml().load( + new File("${universalConnectorsDir}/versions.yml").newInputStream()) + +repositories { + mavenCentral() +} + +apply plugin: 'com.github.johnrengelman.shadow' +shadowJar { classifier = null } + +dependencies { + // ---- Guardium commons (Record structures, Util, GuardConstants) ---------- + implementation fileTree(dir: GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH, + include: "common-*.*.*.jar") + + // ---- Logstash core ------------------------------------------------------- + implementation fileTree(dir: LOGSTASH_CORE_PATH, + include: "build/libs/logstash-core-*.*.*.jar") + + // ---- Shared runtime libraries (superset of all migrated parsers) --------- + implementation group: 'org.apache.logging.log4j', name: 'log4j-core', + version: versions.dependencies.log4jCore + implementation 'com.google.code.gson:gson:' + versions.dependencies.gson + implementation group: 'commons-validator', name: 'commons-validator', + version: versions.dependencies.commonsValidator + implementation 'org.apache.commons:commons-lang3:' + versions.dependencies.commonsLang + implementation group: 'commons-beanutils', name: 'commons-beanutils', + version: versions.dependencies.commonsBeanutils + + // ---- Phase-1 migration: depend on existing parser JARs until their ------- + // ---- source is moved into this plugin. ------------------------------------ + // TODO: remove these once each parser package is migrated here: + // implementation fileTree(dir: '../logstash-filter-mongodb-guardium/build/libs', include: '*.jar') + // implementation fileTree(dir: '../logstash-filter-snowflake-guardium/build/libs', include: '*.jar') + + // ---- Test ---------------------------------------------------------------- + testImplementation 'junit:junit:' + versions.dependencies.junit + testImplementation 'org.jruby:jruby-complete:' + versions.dependencies.jrubyComplete + testImplementation fileTree(dir: GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH, + include: "common-*.*.*.jar") +} + +// ---- Standard Logstash plugin tasks (identical across all plugins) ---------- + +clean { + delete "${projectDir}/Gemfile" + delete "${projectDir}/" + pluginInfo.pluginFullName() + ".gemspec" + delete "${projectDir}/lib/" + delete "${projectDir}/vendor/" + new FileNameFinder().getFileNames(projectDir.toString(), + pluginInfo.pluginFullName() + "-*.*.*.gem").each { delete it } +} + +tasks.withType(JavaCompile) { options.encoding = 'UTF-8' } + +tasks.register("vendor") { + dependsOn shadowJar + doLast { + String vendorPathPrefix = "vendor/jar-dependencies" + String projectGroupPath = project.group.replaceAll('\\.', '/') + File projectJarFile = file("${vendorPathPrefix}/${projectGroupPath}/" + + "${pluginInfo.pluginFullName()}/${project.version}/" + + "${pluginInfo.pluginFullName()}-${project.version}.jar") + projectJarFile.mkdirs() + Files.copy(file("$buildDir/libs/${project.name}-${project.version}.jar").toPath(), + projectJarFile.toPath(), REPLACE_EXISTING) + validatePluginJar(projectJarFile, project.group) + } +} + +tasks.register("generateRubySupportFiles") { + doLast { + generateRubySupportFilesForPlugin(project.description, project.group, version) + } +} + +tasks.register("removeObsoleteJars") { + doLast { + new FileNameFinder().getFileNames( + projectDir.toString(), + "vendor/**/" + pluginInfo.pluginFullName() + "*.jar", + "vendor/**/" + pluginInfo.pluginFullName() + "-" + version + ".jar" + ).each { f -> delete f } + } +} + +tasks.register("gem") { + dependsOn = [downloadAndInstallJRuby, removeObsoleteJars, vendor, generateRubySupportFiles] + doLast { + buildGem(projectDir, buildDir, pluginInfo.pluginFullName() + ".gemspec") + } +} + +tasks.register("copyDependencyLibs", Copy) { + into "dependenciesLib" + from configurations.compileClasspath + from configurations.runtimeClasspath + from configurations.testCompileClasspath + from configurations.testRuntimeClasspath +} + +apply plugin: 'jacoco' +apply plugin: "org.barfuin.gradle.jacocolog" + +jacoco { + toolVersion = "${jacocoVersion}" + reportsDir = file("$buildDir/reports/jacoco") +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + csv.enabled true + html.destination file("${buildDir}/reports/jacoco") + csv.destination file("${buildDir}/reports/jacoco/all.csv") + } + executionData.from fileTree(dir: "${buildDir}/jacoco/", includes: ['**/*.exec']) + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: []) + })) + } + doLast { println "Report -> file://${buildDir}/reports/jacoco/index.html" } +} + +test.finalizedBy jacocoTestReport + +jacocoTestCoverageVerification { + violationRules { + rule { + limit { minimum = minimumCoverage } + } + } + executionData.from fileTree(dir: "${buildDir}/jacoco/", includes: ['**/*.exec']) + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: []) + })) + } +} +project.tasks.check.dependsOn(jacocoTestCoverageVerification, jacocoTestReport) diff --git a/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/GuardiumUniversalFilter.java b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/GuardiumUniversalFilter.java new file mode 100644 index 000000000..a9e2a29aa --- /dev/null +++ b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/GuardiumUniversalFilter.java @@ -0,0 +1,134 @@ +/* + * Copyright 2024 IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache2.0 + */ +package com.ibm.guardium.universal; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Event; +import co.elastic.logstash.api.Filter; +import co.elastic.logstash.api.FilterMatchListener; +import co.elastic.logstash.api.LogstashPlugin; +import co.elastic.logstash.api.PluginConfigSpec; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.ibm.guardium.universal.parser.IGuardiumParser; +import com.ibm.guardium.universal.parser.ParserRegistry; +import com.ibm.guardium.universalconnector.commons.GuardConstants; +import com.ibm.guardium.universalconnector.commons.structures.Record; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +/** + * Single generic Logstash filter plugin that replaces all individual + * {@code logstash-filter-*-guardium} plugins. + * + *
Usage in {@code filter.conf}: + *
{@code
+ * filter {
+ * guardium_universal_filter {
+ * datasource => "mysql" # or "mongodb", "snowflake", "postgres", ...
+ * }
+ * }
+ * }
+ *
+ * This class contains ZERO datasource-specific logic. All parsing is
+ * delegated to the {@link IGuardiumParser} registered for the chosen datasource.
+ * Adding a new datasource means writing one parser class and one line in
+ * {@link ParserRegistry} — no new Logstash plugin, no new gem, no new build.
+ */
+@LogstashPlugin(name = "guardium_universal_filter")
+public class GuardiumUniversalFilter implements Filter {
+
+ // ---- Config specs --------------------------------------------------------
+
+ /** Which datasource this pipeline handles, e.g. "mysql", "mongodb". */
+ public static final PluginConfigSpec Migration note: This class delegates to the parser hierarchy that
+ * currently lives in {@code logstash-filter-mongodb-guardium}. As part of the
+ * full migration to the universal plugin, the following packages should be moved
+ * into this plugin (they have no Logstash dependency — only common + Gson):
+ * Until those packages are moved, include the mongodb plugin's JAR as a
+ * compile-time dependency in {@code build.gradle}.
+ *
+ * All enrichment logic previously scattered across {@code MongodbGuardiumFilter}
+ * (server_hostname, source_program, dbname_prefix, server port, IP correction)
+ * is consolidated here.
+ */
+public class MongoDbParser extends AbstractGuardiumParser {
+
+ private static final String MONGOD_SIGNAL = "mongod: ";
+ private static final String MONGOS_SIGNAL = "mongos: ";
+
+ @Override
+ public Record parseRecord(Event event) throws Exception {
+ if (!(event.getField("message") instanceof String)) return null;
+
+ String message = event.getField("message").toString();
+
+ // Skip internal MongoDB system events
+ if (message.contains("__system") || message.contains("\"c\":\"CONTROL\"")) {
+ return null;
+ }
+
+ // Locate the JSON audit payload after "mongod: " or "mongos: "
+ int idx = message.indexOf(MONGOD_SIGNAL);
+ if (idx == -1) idx = message.indexOf(MONGOS_SIGNAL);
+ if (idx == -1) return null;
+
+ String json = message.substring(idx + MONGOD_SIGNAL.length());
+ JsonObject data = JsonParser.parseString(json).getAsJsonObject();
+
+ // --- Delegate to the existing MongoDB parser factory ---
+ // TODO (migration): move com.ibm.guardium.mongodb.* into this plugin
+ com.ibm.guardium.mongodb.parsersbytype.BaseParser mongoParser =
+ com.ibm.guardium.mongodb.ParserFactory.getParser(data);
+ Record record = mongoParser.parseRecord(data);
+ if (record == null) return null;
+
+ // Enrich with Logstash event fields (previously in MongodbGuardiumFilter)
+ if (event.getField("server_hostname") instanceof String) {
+ record.getAccessor().setServerHostName(
+ event.getField("server_hostname").toString());
+ }
+ Optional.ofNullable(event.getField("source_program"))
+ .map(Object::toString)
+ .ifPresent(sp -> record.getAccessor().setSourceProgram(sp));
+
+ if (event.getField("icd_default_serverport") instanceof String) {
+ record.getSessionLocator().setServerPort(
+ Integer.parseInt(event.getField("icd_default_serverport").toString()));
+ }
+
+ if (event.getField("dbname_prefix") instanceof String) {
+ String prefix = event.getField("dbname_prefix").toString();
+ if (!prefix.isEmpty()) {
+ String db = record.getDbName();
+ String combined = db.isEmpty() ? prefix : prefix + ":" + db;
+ record.setDbName(combined);
+ record.getAccessor().setServiceName(combined);
+ }
+ }
+
+ correctIPs(event, record);
+ return record;
+ }
+}
diff --git a/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/datasources/mysql/MySqlParser.java b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/datasources/mysql/MySqlParser.java
new file mode 100644
index 000000000..d705f0bb8
--- /dev/null
+++ b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/datasources/mysql/MySqlParser.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2024 IBM Inc. All rights reserved
+ * SPDX-License-Identifier: Apache2.0
+ */
+package com.ibm.guardium.universal.datasources.mysql;
+
+import co.elastic.logstash.api.Event;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.ibm.guardium.universal.parser.AbstractGuardiumParser;
+import com.ibm.guardium.universalconnector.commons.Util;
+import com.ibm.guardium.universalconnector.commons.structures.*;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
+
+/**
+ * Parses MySQL audit log events into Guardium Records.
+ *
+ * Migrated from {@code MySqlFilterGuardium} — parsing logic is identical;
+ * all Logstash plugin boilerplate has been removed.
+ *
+ * Supported log formats: MySQL audit log plugin (JSON) over syslog or Filebeat.
+ */
+public class MySqlParser extends AbstractGuardiumParser {
+
+ // ---- Message format constants --------------------------------------------
+
+ private static final String AUDIT_START_SIGNAL = "mysql_audit_log: ";
+
+ private static final String CLASS_CONNECTION = "connection";
+ private static final String CLASS_GENERAL = "general";
+ private static final String DATA_CONNECTION = "connection_data";
+ private static final String DATA_GENERAL = "general_data";
+
+ private static final String DB_PROTOCOL = "MySQL";
+ private static final String SERVER_TYPE = "MySql";
+ private static final String LANGUAGE = "MYSQL";
+ private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ // SimpleDateFormat is not thread-safe; guard with synchronized helper.
+ private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat(DATE_FORMAT);
+
+ // ---- IGuardiumParser implementation -------------------------------------
+
+ @Override
+ public Record parseRecord(Event event) throws Exception {
+ String message = event.getField("message").toString();
+
+ int idx = message.indexOf(AUDIT_START_SIGNAL);
+ if (idx == -1) return null;
+
+ String jsonStr = message.substring(idx + AUDIT_START_SIGNAL.length());
+ // Remove trailing comma if present (MySQL audit log quirk)
+ if (jsonStr.endsWith(",")) {
+ jsonStr = jsonStr.substring(0, jsonStr.length() - 1);
+ }
+
+ JsonObject data = JsonParser.parseString(jsonStr).getAsJsonObject();
+ String classType = data.get("class").getAsString();
+ String timestamp = data.get("timestamp").getAsString();
+
+ if (!CLASS_CONNECTION.equals(classType) && !CLASS_GENERAL.equals(classType)) {
+ return null; // skip audit/table_access class events
+ }
+
+ Record record = new Record();
+ record.setDbName(UNKNOWN);
+ String eventField = data.get("event").getAsString();
+ boolean validRecord = false;
+
+ if (data.has(DATA_CONNECTION) && "connect".equals(eventField)) {
+ JsonObject connData = data.getAsJsonObject(DATA_CONNECTION);
+ int status = connData.get("status").getAsInt();
+ String dbName = connData.get("db").getAsString();
+ record.setDbName(dbName);
+
+ if (!dbName.isEmpty() && status == 0) {
+ record.setAccessor(buildAccessor(event, data));
+ validRecord = true;
+ } else if (status != 0) {
+ ExceptionRecord ex = new ExceptionRecord();
+ ex.setExceptionTypeId("LOGIN_FAILED");
+ ex.setDescription("Login Failed (" + status + ")");
+ ex.setSqlString(UNKNOWN);
+ record.setException(ex);
+ validRecord = true;
+ }
+
+ } else if (data.has(DATA_GENERAL)) {
+ JsonObject genData = data.getAsJsonObject(DATA_GENERAL);
+ String command = genData.get("command").getAsString();
+ int status = genData.get("status").getAsInt();
+
+ if ("Query".equals(command)) {
+ String query = genData.get("query").getAsString();
+ if (status != 0) {
+ ExceptionRecord ex = new ExceptionRecord();
+ ex.setExceptionTypeId("SQL_ERROR");
+ ex.setDescription("Error (" + status + ")");
+ ex.setSqlString(query);
+ record.setException(ex);
+ } else if (query != null) {
+ Data d = new Data();
+ d.setOriginalSqlCommand(query);
+ record.setData(d);
+ }
+ validRecord = true;
+ }
+ }
+
+ if (!validRecord) return null;
+
+ record.setSessionId(data.has("connection_id") && !data.get("connection_id").isJsonNull()
+ ? String.valueOf(data.get("connection_id").getAsInt())
+ : UNKNOWN);
+ record.setAppUserName(UNKNOWN);
+ record.setTime(parseTime(timestamp));
+ record.setSessionLocator(buildSessionLocator(event, data));
+ record.setAccessor(buildAccessor(event, data));
+ correctIPs(event, record);
+
+ return record;
+ }
+
+ // ---- Helpers -------------------------------------------------------------
+
+ private static synchronized Time parseTime(String ts) throws ParseException {
+ Date date = DATE_FORMATTER.parse(ts);
+ ZonedDateTime zdt = ZonedDateTime.ofInstant(date.toInstant(), ZoneId.of("UTC"));
+ return new Time(zdt.toInstant().toEpochMilli(),
+ zdt.getOffset().getTotalSeconds() / 60, 0);
+ }
+
+ private static SessionLocator buildSessionLocator(Event event, JsonObject data) {
+ SessionLocator loc = new SessionLocator();
+
+ String serverIp = event.getField("server_ip") instanceof String
+ ? event.getField("server_ip").toString() : "0.0.0.0";
+
+ if (Util.isIPv6(serverIp)) {
+ loc.setServerIpv6(serverIp);
+ loc.setServerIp(UNKNOWN);
+ loc.setIpv6(true);
+ } else {
+ loc.setServerIp(serverIp);
+ loc.setServerIpv6(UNKNOWN);
+ loc.setIpv6(false);
+ }
+ loc.setServerPort(SessionLocator.PORT_DEFAULT);
+ loc.setClientIp(UNKNOWN);
+ loc.setClientIpv6(UNKNOWN);
+ loc.setClientPort(SessionLocator.PORT_DEFAULT);
+
+ if (data.has("login")) {
+ JsonObject login = data.getAsJsonObject("login");
+ String addr = login.get("ip").getAsString();
+ if (Util.isIPv6(addr)) {
+ loc.setClientIpv6(addr);
+ loc.setClientIp(UNKNOWN);
+ loc.setIpv6(true);
+ } else {
+ loc.setClientIp(addr);
+ loc.setClientIpv6(UNKNOWN);
+ }
+ }
+ return loc;
+ }
+
+ private static Accessor buildAccessor(Event event, JsonObject data) {
+ Accessor acc = new Accessor();
+ acc.setDbProtocol(DB_PROTOCOL);
+ acc.setServerType(SERVER_TYPE);
+ acc.setLanguage(LANGUAGE);
+ acc.setDataType(Accessor.DATA_TYPE_GUARDIUM_SHOULD_PARSE_SQL);
+
+ acc.setServerHostName(event.getField("server_hostname") instanceof String
+ ? event.getField("server_hostname").toString() : UNKNOWN);
+
+ if (data.has("account")) {
+ String user = data.getAsJsonObject("account").get("user").getAsString();
+ acc.setDbUser(user == null || user.isEmpty() ? "NA" : user);
+ }
+
+ String osUser = UNKNOWN, osStr = UNKNOWN;
+ if (data.has(DATA_CONNECTION)) {
+ JsonObject connData = data.getAsJsonObject(DATA_CONNECTION);
+ if (connData.has("connection_attributes")) {
+ JsonObject attrs = connData.getAsJsonObject("connection_attributes");
+ if (attrs.has("_os")) osStr = attrs.get("_os").getAsString();
+ if (attrs.has("os_user")) osUser = attrs.get("os_user").getAsString();
+ }
+ if ("connect".equals(data.get("event").getAsString())) {
+ acc.setServiceName(connData.get("db").getAsString());
+ }
+ }
+ if (acc.getServiceName() == null) acc.setServiceName(UNKNOWN);
+
+ acc.setOsUser(osUser);
+ acc.setServerOs(osStr);
+ acc.setSourceProgram(UNKNOWN);
+ acc.setClient_mac(UNKNOWN);
+ acc.setClientHostName(UNKNOWN);
+ acc.setClientOs(UNKNOWN);
+ acc.setCommProtocol(UNKNOWN);
+ acc.setDbProtocolVersion(UNKNOWN);
+ acc.setServerDescription(UNKNOWN);
+ return acc;
+ }
+}
diff --git a/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/datasources/snowflake/SnowflakeParser.java b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/datasources/snowflake/SnowflakeParser.java
new file mode 100644
index 000000000..8b0e8caf9
--- /dev/null
+++ b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/datasources/snowflake/SnowflakeParser.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2024 IBM Inc. All rights reserved
+ * SPDX-License-Identifier: Apache2.0
+ */
+package com.ibm.guardium.universal.datasources.snowflake;
+
+import co.elastic.logstash.api.Event;
+import com.ibm.guardium.universal.parser.AbstractGuardiumParser;
+import com.ibm.guardium.universalconnector.commons.structures.Record;
+
+import java.util.Optional;
+
+/**
+ * Thin connector for Snowflake audit log parsing.
+ *
+ * Migration note: This class delegates to event-type parsers that
+ * currently live in {@code logstash-filter-snowflake-guardium}. As part of the
+ * full migration, the following packages should be moved into this plugin:
+ * All logic previously in {@code GuardiumSnowflakeFilter} is consolidated here.
+ */
+public class SnowflakeParser extends AbstractGuardiumParser {
+
+ // TODO (migration): move com.ibm.guardium.snowflakedb.* into this plugin
+ private static final String EVENT_TYPE_FIELD = com.ibm.guardium.snowflakedb.utils.Constants.EVENT_TYPE;
+
+ @Override
+ public Record parseRecord(Event event) throws Exception {
+ String eventType = Optional.ofNullable(event.getField(EVENT_TYPE_FIELD))
+ .map(Object::toString).orElse(null);
+ if (eventType == null) return null;
+
+ com.ibm.guardium.snowflakedb.parser.Parser parser;
+ switch (eventType.toUpperCase()) {
+ case com.ibm.guardium.snowflakedb.utils.Constants.SQL_ERROR:
+ parser = new com.ibm.guardium.snowflakedb.parser.SQLErrorEventParser();
+ break;
+ case com.ibm.guardium.snowflakedb.utils.Constants.LOGIN_FAILED:
+ parser = new com.ibm.guardium.snowflakedb.parser.AuthFailedEventParser();
+ break;
+ case com.ibm.guardium.snowflakedb.utils.Constants.SUCCESS:
+ parser = new com.ibm.guardium.snowflakedb.parser.SuccessEventParser();
+ break;
+ default:
+ return null;
+ }
+
+ Record record = parser.parseRecord(event.toMap());
+
+ // Skip events with no resolved user (same guard as original filter)
+ String dbUser = record.getAccessor().getDbUser();
+ if (dbUser == null || dbUser.isEmpty()) return null;
+
+ return record;
+ }
+}
diff --git a/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/parser/AbstractGuardiumParser.java b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/parser/AbstractGuardiumParser.java
new file mode 100644
index 000000000..ded368016
--- /dev/null
+++ b/filter-plugin/logstash-filter-guardium-universal/src/main/java/com/ibm/guardium/universal/parser/AbstractGuardiumParser.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2024 IBM Inc. All rights reserved
+ * SPDX-License-Identifier: Apache2.0
+ */
+package com.ibm.guardium.universal.parser;
+
+import co.elastic.logstash.api.Event;
+import com.ibm.guardium.universalconnector.commons.Util;
+import com.ibm.guardium.universalconnector.commons.structures.Record;
+import com.ibm.guardium.universalconnector.commons.structures.SessionLocator;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Optional base class providing utilities shared across many datasource parsers.
+ *
+ * Parsers may extend this class to avoid duplicating IP-correction and
+ * other common logic, but implementing {@link IGuardiumParser} directly is
+ * also fine.
+ */
+public abstract class AbstractGuardiumParser implements IGuardiumParser {
+
+ protected static final String UNKNOWN = "";
+ protected static final int DEFAULT_PORT = SessionLocator.PORT_DEFAULT;
+
+ private static final Set Implementations contain ONLY the data-source-specific parsing logic.
+ * All Logstash plugin boilerplate (registration, event loop, GSON, Log4j,
+ * error tagging) is handled once in {@link com.ibm.guardium.universal.GuardiumUniversalFilter}.
+ *
+ * Adding a new datasource requires exactly one line here plus the parser class itself.
+ * No Logstash plugin scaffolding, build files, or gem packaging are needed.
+ */
+public class ParserRegistry {
+
+ // Case-insensitive map so "MySQL", "mysql", and "MYSQL" all resolve.
+ private static final Map Datasource-specific parsing is tested in the parser's own test class
+ * (e.g., {@code MySqlParserTest}), keeping these tests focused on the
+ * generic plugin lifecycle.
+ */
+public class GuardiumUniversalFilterTest {
+
+ // ---- ParserRegistry tests ------------------------------------------------
+
+ @Test
+ public void testRegistry_mySqlIsRegistered() {
+ IGuardiumParser parser = ParserRegistry.getParser("mysql");
+ Assert.assertNotNull(parser);
+ Assert.assertTrue(parser instanceof MySqlParser);
+ }
+
+ @Test
+ public void testRegistry_caseInsensitive() {
+ IGuardiumParser p1 = ParserRegistry.getParser("MySQL");
+ IGuardiumParser p2 = ParserRegistry.getParser("mysql");
+ IGuardiumParser p3 = ParserRegistry.getParser("MYSQL");
+ Assert.assertSame(p1.getClass(), p2.getClass());
+ Assert.assertSame(p2.getClass(), p3.getClass());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testRegistry_unknownDatasourceThrows() {
+ ParserRegistry.getParser("nonexistent-db-12345");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testRegistry_emptyDatasourceThrows() {
+ ParserRegistry.getParser("");
+ }
+
+ @Test
+ public void testRegistry_customParserCanBeRegistered() {
+ IGuardiumParser stub = event -> null;
+ ParserRegistry.register("test-stub", stub);
+ Assert.assertSame(stub, ParserRegistry.getParser("test-stub"));
+ }
+
+ // ---- MySqlParser unit tests ----------------------------------------------
+
+ private static final String MYSQL_QUERY_EVENT =
+ "Mar 10 12:00:00 db-host mysqld: mysql_audit_log: "
+ + "{\"timestamp\":\"2024-03-10 12:00:00\",\"id\":1,"
+ + "\"class\":\"general\",\"event\":\"query\","
+ + "\"connection_id\":42,"
+ + "\"account\":{\"user\":\"admin\",\"host\":\"localhost\"},"
+ + "\"login\":{\"user\":\"admin\",\"os\":\"\",\"ip\":\"10.0.0.1\",\"proxy\":\"\"},"
+ + "\"general_data\":{\"command\":\"Query\",\"sql_command\":\"select\","
+ + "\"query\":\"SELECT 1\",\"status\":0}}";
+
+ private static final String MYSQL_LOGIN_FAILED_EVENT =
+ "mysql_audit_log: "
+ + "{\"timestamp\":\"2024-03-10 12:00:00\",\"id\":2,"
+ + "\"class\":\"connection\",\"event\":\"connect\","
+ + "\"connection_id\":43,"
+ + "\"account\":{\"user\":\"baduser\",\"host\":\"localhost\"},"
+ + "\"login\":{\"user\":\"baduser\",\"os\":\"\",\"ip\":\"10.0.0.2\",\"proxy\":\"\"},"
+ + "\"connection_data\":{\"connection_type\":\"ssl\",\"status\":1,"
+ + "\"db\":\"testdb\",\"connection_attributes\":{}}}";
+
+ @Test
+ public void testMySqlParser_successfulQuery() throws Exception {
+ MySqlParser parser = new MySqlParser();
+ Event event = new org.logstash.Event();
+ event.setField("message", MYSQL_QUERY_EVENT);
+ event.setField("server_ip", "192.168.1.10");
+ event.setField("server_hostname", "db-host");
+
+ Record record = parser.parseRecord(event);
+
+ Assert.assertNotNull("Parser should return a Record for a valid query event", record);
+ Assert.assertNotNull("Data should be set for a successful query", record.getData());
+ Assert.assertEquals("SELECT 1", record.getData().getOriginalSqlCommand());
+ Assert.assertEquals("42", record.getSessionId());
+ Assert.assertEquals("admin", record.getAccessor().getDbUser());
+ }
+
+ @Test
+ public void testMySqlParser_loginFailed() throws Exception {
+ MySqlParser parser = new MySqlParser();
+ Event event = new org.logstash.Event();
+ event.setField("message", MYSQL_LOGIN_FAILED_EVENT);
+ event.setField("server_ip", "192.168.1.10");
+
+ Record record = parser.parseRecord(event);
+
+ Assert.assertNotNull(record);
+ Assert.assertNotNull("Exception should be set for login failure", record.getException());
+ Assert.assertEquals("LOGIN_FAILED", record.getException().getExceptionTypeId());
+ }
+
+ @Test
+ public void testMySqlParser_irrelevantMessage_returnsNull() throws Exception {
+ MySqlParser parser = new MySqlParser();
+ Event event = new org.logstash.Event();
+ event.setField("message", "some unrelated syslog message without mysql prefix");
+
+ Record record = parser.parseRecord(event);
+
+ Assert.assertNull("Parser should return null for non-MySQL messages", record);
+ }
+}
+ *
+ *
+ *
+ *
+ *
+ * Adding a new datasource
+ *
+ *
+ */
+public interface IGuardiumParser {
+
+ /**
+ * Parse a Logstash event into a Guardium {@link Record}.
+ *
+ * @param event the Logstash event containing raw log data
+ * @return a fully populated Record, or {@code null} to silently skip the event
+ * @throws Exception if the event is malformed; the generic filter will tag it
+ * as {@code _guardium_parse_error_