flink1.8 對hive 的支持不夠好,形成300W的數據,竟然讀了2個小時,打算將程序遷移至spark。 先把代碼貼上。 後發現sql不該該有where條件,去掉後速度還行。java
mavenredis
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency>
javasql
private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驅動路徑 private final static String url = ";";// hive庫地址+庫名 private final static String user = "";// 用戶名 private final static String password = "!";// 密碼 private final static String table=""; private final static String sql = " "; public static void main(String[] arg) throws Exception { long time=System.currentTimeMillis(); HttpClientUtil.sendDingMessage("開始同步hive-"+table+";"+Utils.getTimeString()); /** * 初始化環境 */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); try { TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; String[] colName = new String[]{"user","name"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName); JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName) .setDBUrl(url) .setUsername(user).setPassword(password); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.add(Calendar.DATE, -1); //用昨天產出的數據 SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd"); String d=sj.format(calendar.getTime()); JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish(); DataSource<Row> rowlist = env.createInput(jdbcInputFormat); DataSet<RedisDataModel> temp= rowlist.filter(new FilterFunction<Row>(){ @Override public boolean filter(Row row) throws Exception { String key=row.getField(0).toString(); String value=row.getField(1).toString(); if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) { return false; }else { return true; } } }).map(new MapFunction<Row, RedisDataModel>(){ @Override public RedisDataModel map(Row value) throws Exception { RedisDataModel m=new RedisDataModel(); m.setExpire(-1); m.setKey(JobConstants.REDIS_FLINK_IMEI_USER+value.getField(0).toString()); m.setGlobal(true); m.setValue(value.getField(1).toString()); return m; } }); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;開始推送模型,共有"+temp.count()+"條;"+Utils.getTimeString()); RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat() .setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER)) .setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS)) .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE))) .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL))) .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS))) .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW))) .finish(); temp.output(redisOutput); env.execute("hive-"+table+" sync"); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗時:"+(System.currentTimeMillis()-time)/1000+"s"); } catch (Exception e) { logger.error("",e); HttpClientUtil.sendDingMessage("同步hive-"+table+"失敗,時間戳:"+time+",緣由:"+e.toString()); }