Skip to content

Commit

Permalink
基本数据清洗
Browse files Browse the repository at this point in the history
  • Loading branch information
nyancat-hu committed Aug 1, 2022
1 parent 20f11f2 commit 5307e9b
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 596 deletions.
2 changes: 1 addition & 1 deletion SZT-ETL/ETL-Flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
Expand Down
2 changes: 1 addition & 1 deletion SZT-ETL/ETL-Flink/src/main/resources/kafka.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
kafka.broker-list=cdh231:9092,cdh232:9092,cdh233:9092
kafka.broker-list=topview102:9092,topview103:9092,topview104:9092

#kafka.producer.topic=topic-flink-szt-all
#kafka.producer.topic=topic-flink-szt-test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand,
*
*/
object Jsons2Redis {
val SVAE_PATH = "/tmp/szt-data/szt-data-page.jsons"
val SVAE_PATH = "D:\\IDEAworkspace\\SZT-bigdata\\szt-data\\szt-data-page-all.json"

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -28,7 +28,7 @@ object Jsons2Redis {
})

//定义 redis 参数
val jedis = new FlinkJedisPoolConfig.Builder().setHost("localhost").build()
val jedis = new FlinkJedisPoolConfig.Builder().setHost("Topview104").build()

//存到 redis sink
s.addSink(new RedisSink(jedis, new MyRedisSinkFun))
Expand All @@ -39,6 +39,7 @@ object Jsons2Redis {

case class MyRedisSinkFun() extends RedisMapper[JSONObject] {
override def getCommandDescription: RedisCommandDescription = {
// 命令为hashset
new RedisCommandDescription(RedisCommand.HSET, "szt:pageJson")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object Redis2Csv {
.filter(x => {
val json = JSON.parseObject(x)
val deal_date = json.getString("deal_date")
// 过滤只要某一天的数据
deal_date.startsWith(ymd)
})
.addSink(new MyCsvSinkFun(ymd))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ object Redis2ES {
env.setParallelism(1)

val hosts = List[HttpHost](
new HttpHost("cdh231", 9200),
new HttpHost("cdh232", 9200),
new HttpHost("cdh233", 9200)
new HttpHost("Topview102", 9200),
new HttpHost("Topview103", 9200),
new HttpHost("Topview104", 9200)
)

val esSink = new ElasticsearchSink.Builder(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package cn.java666.etlflink.app

import java.util.Properties

import cn.java666.etlflink.source.MyRedisSourceFun
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

import scala.util.Random

/**
* @author Geek
* @date 2020-04-14 04:35:36
Expand All @@ -25,7 +26,7 @@ object Redis2Kafka {
val s = env.addSource[String](new MyRedisSourceFun)
.map(x => {
// TODO 假装休息一会,如果客户觉得速度太慢,可以加钱优化!!!但是这里我们真的需要休息,模拟流式数据连续的注入
//Thread.sleep(Random.nextInt(1000))
Thread.sleep(Random.nextInt(1000))
x
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction
*/

case class MyCsvSinkFun(day: String) extends SinkFunction[String] {
val SAVE_PATH = "/tmp/szt-data/szt-data_" + day + ".csv"
val SAVE_PATH = "D:\\IDEAworkspace\\SZT-bigdata\\szt-data\\szt-data_" + day + ".csv"

override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
// 11 个字段
Expand Down Expand Up @@ -48,11 +48,11 @@ case class MyCsvSinkFun(day: String) extends SinkFunction[String] {
.add(deal_money)
.add(equ_no)
.toString

// 对每行数据输出一次
FileUtil.appendUtf8String(csv + "\n", SAVE_PATH)

val i = FileUtil.readUtf8Lines(SAVE_PATH).size()
// 核对 ES 数据库,记录完全一致!!!
println(i) // szt-data_2018-09-01.csv 合计 1229180 条
// val i = FileUtil.readUtf8Lines(SAVE_PATH).size()
// // 核对 ES 数据库,记录完全一致!!!
// println(i) // szt-data_2018-09-01.csv 合计 1229180 条
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ case class MyESSinkFun(index: String) extends ElasticsearchSinkFunction[String]

val json = JSON.parseObject(element)
val indexReq = new IndexRequest(index).source(json, XContentType.JSON)
// 通过这个对象发送数据就好了
indexer.add(indexReq)
println(s"es save element=${element} | json=${json}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import redis.clients.jedis.Jedis
* 根据业务需要,自由定制
*
*/

// 该Source用于将每个page的json数据根据data按条转换(仅取data中的数据)
case class MyRedisSourceFun() extends RichSourceFunction[String] {
var client: Jedis = _

Expand All @@ -40,7 +40,7 @@ case class MyRedisSourceFun() extends RichSourceFunction[String] {
val data = JSON.parseObject(xStr)
//if (data.size() != 11 && data.size() != 9) { //这里长度不统一,9|11
if (data.size() != 11) { //这里长度不统一,9|11
//System.err.println(" data error ------------------ x=" + x)// TODO 可选是否打印脏数据
System.err.println(" data error ------------------ x=" + x)// TODO 可选是否打印脏数据
} else {
// 只保留字段长度为 11 的源数据 ===> kafka: topic-flink-szt
ctx.collect(xStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ object RedisUtil {
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(2000) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "localhost", 6379)
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "Topview104", 6379)

// 直接得到一个 Redis 的连接
def getJedisClient: Jedis = {
jedisPool.getResource
}

//测试通过
//def main(args: Array[String]): Unit = {
// println(getJedisClient.hget("szt:pageJson", "1"))
//}
// def main(args: Array[String]): Unit = {
// println(getJedisClient.hget("szt:pageJson", "1"))
// }
}
4 changes: 4 additions & 0 deletions SZT-ETL/ETL-SpringBoot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Docket defaultApi2() {
//分组名称
.groupName("ETL-SpringBoot")
.select()
//这里指定Controller扫描包路径
//这里指定Controller扫描包路径,将自动生成里边的api调用
.apis(RequestHandlerSelectors.basePackage("cn.java666.etlspringboot.controller"))
.paths(PathSelectors.any())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ spring.application.name=ETL-SpringBoot
server.port=6661
server.address=localhost

spring.redis.host=localhost
spring.redis.host=Topview104
spring.redis.port=6379

#spring.data.elasticsearch.cluster-nodes=cdh231:9200,cdh232:9200,cdh233:9200
Expand Down
5 changes: 3 additions & 2 deletions SZT-common/src/main/java/cn/java666/sztcommon/SZTData.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
* 后续还得拆解。
*/
public class SZTData {
String SAVE_PATH = "/tmp/szt-data/szt-data-page-all.json";
String SAVE_PATH = "D:\\IDEAworkspace\\SZT-bigdata\\szt-data\\szt-data-page-all.json";

// TODO appKey 自己申请 https://opendata.sz.gov.cn/data/api/toApiDetails/29200_00403601
String appKey = "***";
String appKey = "ba64e4fac5d044b2860ba23c330cfce0";

/**
* 这个过程可能花费一个通宵,如果中断,查看已保存数据最后一条的 page,然后调整 i 的起始值继续抓取
Expand All @@ -33,6 +33,7 @@ public void saveData() {
// 一定要加换行符,否则以后处理起来会是灾难。
// 一定要加换行符,否则以后处理起来会是灾难。
// 一定要加换行符,否则以后处理起来会是灾难。
System.out.println(s+ "\n");
FileUtil.appendUtf8String(s + "\n", SAVE_PATH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public String desc() {
return desc;
}

private SztEnum(String val, String desc) {
SztEnum(String val, String desc) {
this.val = val;
this.desc = desc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,19 @@ public class ParseCardNo {

public static String parse(@NotNull String no) {
if (StrUtil.isBlank(no)) {
return "滚!!!";
//如果给定的字符串为空或仅包含空格代码点,则此方法返回 true ,否则返回 false 。
return "无效卡序列";
}

char[] array = no.toCharArray();
StringJoiner joiner = new StringJoiner("");
for (char c : array) {
if (NumberUtil.isNumber(no)) {
// 明文转密文
String v = NO2CharMap.get(c).toString();
joiner.add(v);
} else {
// 密文转明文
String v = char2NOMap.get(c).toString();
joiner.add(v);
}
Expand Down
Loading

0 comments on commit 5307e9b

Please sign in to comment.