SparkSession spark = SparkSession .Builder() .AppName("Apache User Log Processing") .GetOrCreate();
DataFrame generalDf = spark.Read().Text("<path to input data set>");
string s_apacheRx = "^(\S+) (\S+) (\S+) [([\w:/]+\s[+-]\d{4})] \"(\S+) (\S+) (\S+)\" (\d{3}) (\d+)";
咱們如何對DataFrame的每一行執行計算,好比將每一個日誌條目與上面的s_apacheRx進行匹配?答案是Spark SQL。web
spark.Udf().Register<string, bool>("GeneralReg", log => Regex.IsMatch(log, s_apacheRx));
DataFrame generalDf = spark.Sql("SELECT logs.value, GeneralReg(logs.value) FROM Logs");
generalDf = generalDf.Filter(generalDf["GeneralReg(value)"]); generalDf.Show();
// Choose valid log entries that start with 10 spark.Udf().Register<string, bool>( "IPReg", log => Regex.IsMatch(log, "^(?=10)")); generalDf.CreateOrReplaceTempView("IPLogs"); // Apply UDF to get valid log entries starting with 10 DataFrame ipDf = spark.Sql( "SELECT iplogs.value FROM IPLogs WHERE IPReg(iplogs.value)"); ipDf.Show(); // Choose valid log entries that start with 10 and deal with spam spark.Udf().Register<string, bool>( "SpamRegEx", log => Regex.IsMatch(log, "\\b(?=spam)\\b")); ipDf.CreateOrReplaceTempView("SpamLogs"); // Apply UDF to get valid, start with 10, spam entries DataFrame spamDF = spark.Sql( "SELECT spamlogs.value FROM SpamLogs WHERE SpamRegEx(spamlogs.value)");
int numGetRequests = spamDF .Collect() .Where(r => ContainsGet(r.GetAs<string>("value"))) .Count();
// Use regex matching to group data // Each group matches a column in our log schema // i.e. first group = first column = IP public static bool ContainsGet(string logLine) { Match match = Regex.Match(logLine, s_apacheRx); // Determine if valid log entry is a GET request if (match.Success) { Console.WriteLine("Full log entry: '{0}'", match.Groups[0].Value); // 5th column/group in schema is "method" if (match.Groups[5].Value == "GET") { return true; } } return false; }
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar dotnet /path/to/netcoreapp<version>/LoggingApp.dll
var mySqlQuery = "SELECT * FROM table WHERE id = " + std_name;
每次執行這一條查詢的時候返回的結果均可能會不同,這取決於std_name的值。正則表達式