private final static Logger logger = LoggerFactory.getLogger(GetData.class); public static void main(String[] arg) throws Exception { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8") .setUsername("*") .setPassword("*") .setQuery("select name from words") .setRowTypeInfo(rowTypeInfo) .finish(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource s = env.createInput(jdbcInputFormat); // datasource BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerDataSet("t2", s); tableEnv.sqlQuery("select * from t2").printSchema(); Table query = tableEnv.sqlQuery("select * from t2"); DataSet result = tableEnv.toDataSet(query, Row.class); result.print(); System.out.println(s.count()); }
經過插件將所需的類打到一個jar中mysql
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 此處指定main方法入口的class --> <mainClass>*</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin>
而後執行sql
./bin/flink run /flink-1.8.0/collector-api-0.1.jar