sc.addFile("/home/users/luozg/mydata4vipday4_cn.ipdb") val city_db:org.apache.spark.broadcast.Broadcast[net.ipip.ipdb.City] = sc.broadcast(newCity(SparkFiles.get("mydata4vipday4_cn.ipdb")))
defipcity(ip:String) = { try{ city_db.value.findInfo(ip,"CN").getCityName() }catch{ case ex : InvalidDatabaseException => println("InvalidDatabaseException") case ex : IPFormatException => println("IPFormatException") case ex : Exception => println("Exception") }
}
defipprovince(ip:String) = { try{ city_db.value.findInfo(ip,"CN").getRegionName() }catch{ case ex : InvalidDatabaseException => println("InvalidDatabaseException") case ex : IPFormatException => println("IPFormatException") case ex : Exception => println("Exception") }
}
defipcountry(ip:String) = { try{ city_db.value.findInfo(ip,"CN").getCountryName() }catch{ case ex : InvalidDatabaseException => println("InvalidDatabaseException") case ex : IPFormatException => println("IPFormatException") case ex : Exception => println("Exception") }
}
val logpath = "hdfs://ksdc-cluster/raw/200/attack/201902/20190221/*.gz" val outpath = "hdfs://ksdc-cluster/jjfy/luozg/logs/cd_attack/"
println(sc) defmyfilter(line:String)={ val fields = line.split(" ") if(fields.length >= 25){ val client_ip = fields(5) ipprovince(client_ip).equals("四川") && ipcity(client_ip).equals("成都") }else{ false }