在spark-shell中使用ipdb格式IP库

在spark-shell中使用ipdb格式IP库

编译jar

将IPIP官方提供的Java版本解析代码(https://github.com/ipipdotnet/ipdb-java ) 克隆到本地,编译打包成jar.
提供一个已经编译好的jar包: hdfs://ksdc-cluster/jjfy/luozg/ipdb.jar

编写scala代码

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Calendar
import java.io.IOException
import java.lang.Exception
import net.ipip.ipdb.City
import net.ipip.ipdb.CityInfo
import net.ipip.ipdb.InvalidDatabaseException
import net.ipip.ipdb.IPFormatException
import org.apache.spark.SparkFiles

sc.addFile("/home/users/luozg/mydata4vipday4_cn.ipdb")
val city_db:org.apache.spark.broadcast.Broadcast[net.ipip.ipdb.City] = sc.broadcast(new City(SparkFiles.get("mydata4vipday4_cn.ipdb")))

def ipcity(ip:String) = {
try{
city_db.value.findInfo(ip,"CN").getCityName()
}catch{
case ex : InvalidDatabaseException => println("InvalidDatabaseException")
case ex : IPFormatException => println("IPFormatException")
case ex : Exception => println("Exception")
}

}

def ipprovince(ip:String) = {
try{
city_db.value.findInfo(ip,"CN").getRegionName()
}catch{
case ex : InvalidDatabaseException => println("InvalidDatabaseException")
case ex : IPFormatException => println("IPFormatException")
case ex : Exception => println("Exception")
}

}

def ipcountry(ip:String) = {
try{
city_db.value.findInfo(ip,"CN").getCountryName()
}catch{
case ex : InvalidDatabaseException => println("InvalidDatabaseException")
case ex : IPFormatException => println("IPFormatException")
case ex : Exception => println("Exception")
}

}



val logpath = "hdfs://ksdc-cluster/raw/200/attack/201902/20190221/*.gz"
val outpath = "hdfs://ksdc-cluster/jjfy/luozg/logs/cd_attack/"

println(sc)
def myfilter(line:String)={
val fields = line.split(" ")
if(fields.length >= 25){
val client_ip = fields(5)
ipprovince(client_ip).equals("四川") && ipcity(client_ip).equals("成都")
}else{
false
}

}

sc.textFile(logpath).filter(myfilter).saveAsTextFile(outpath)
sys.exit

提交任务:

1
spark-shell --jars /home/users/luozg/ipdb.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer -i ~/ksdc_task/ipdb_test.scala