目標: Flink從txt文件中讀取數據,寫入到mysql中java
環境準備: 若是沒有mysql,能夠按照下面命令安裝一下mysql
wget https://repo.mysql.com//mysql80-community-release-el7-3.noarch.rpm yum -y install mysql80-community-release-el7-3.noarch.rpm yum install mysql-community-server -y systemctl restart mariadb
查看mysql默認密碼sql
[root@localhost ~]# grep 'temporary password' /var/log/mysqld.log 2020-11-04T02:05:26.432219Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: qCk4b_3iEE;V
修改mysql默認密碼數據庫
mysql -uroot -p'qCk4b_3iEE;V' mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'Mafei@20201104'; Query OK, 0 rows affected (0.03 sec)
容許全部主機鏈接mysqlapache
mysql> update user set host = '%' where user = 'root'; systemctl restart mysqld
建立mysql的數據庫及表api
create database test default character set utf8mb4 collate utf8mb4_unicode_ci;
-- ---------------------------- -- Table structure for sensor_temp -- ---------------------------- DROP TABLE IF EXISTS `sensor_temp`; CREATE TABLE `sensor_temp` ( `ids` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `temp` double(10,0) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; SET FOREIGN_KEY_CHECKS = 1;
新建一個scala object 類,JdbcSinkmaven
package com.mafei.sinktest import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} object JdbcSink { def main(args: Array[String]): Unit = { //建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) inputStream.print() //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) dataStream.addSink(new MyJdbcSinkFunc()) env.execute() } } class MyJdbcSinkFunc() extends RichSinkFunction[SensorReadingTest5]{ //定義鏈接、預編譯語句 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://10.0.83.82:3306/test","root","Mafei@20201104") insertStmt = conn.prepareStatement("INSERT INTO `sensor_temp`(`ids`, `temp`) VALUES ( ?, ?)") updateStmt = conn.prepareStatement("update sensor_temp set temp= ? where ids= ? ") } override def invoke(in: SensorReadingTest5): Unit = { updateStmt.setDouble(1,in.temperature) updateStmt.setString(2,in.id) updateStmt.execute() if (updateStmt.getUpdateCount ==0){ println("執行了插入操做。。。") insertStmt.setString(1,in.id) insertStmt.setDouble(2,in.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }