-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathservice.py
162 lines (139 loc) · 5.81 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from datetime import datetime, UTC
from typing import Dict, List, Optional
from pydantic import BaseModel
from common_osint_model.models import ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler, Logger
from common_osint_model.models.http import HTTPComponent
from common_osint_model.models.ssh import SSHComponent
from common_osint_model.models.tls import TLSComponent
from common_osint_model.models.dns import DNSComponent
from common_osint_model.utils import hash_all
class Service(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler, Logger):
"""Represents a single service answering connections on specific ports."""
port: int
# Banner is optional as not every scanning service offers complete banners as response. Banners might be
# reconstructed from the data, but some attributes might have the wrong order then (e.g. HTTP headers).
# The according hashes are also not reliable because of this.
banner: Optional[str] = None
md5: Optional[str] = None
sha1: Optional[str] = None
sha256: Optional[str] = None
murmur: Optional[str] = None
# Every service object should include these timestamps. "timestamp" can be used for tracking the observation
# timestamp from scanning services (e.g. Shodan)
first_seen: Optional[datetime] = datetime.now(UTC)
last_seen: Optional[datetime] = datetime.now(UTC)
timestamp: Optional[datetime] = None
# We need to include every possible service component here. In order to not export empty dictionary keys, the class
# object can be exported with dict(exclude_none=True), so e.g. empty tls keys are skipped.
http: Optional[HTTPComponent] = None
tls: Optional[TLSComponent] = None
ssh: Optional[SSHComponent] = None
dns: Optional[DNSComponent] = None
# Typically hosts consist of different services which might be discovered by different scanning services, so
# remarking which service was observed by which scanner might be a good idea.
source: str
@classmethod
def from_shodan(cls, d: Dict):
"""Creates an instance of this class using a dictionary with typical shodan data."""
if isinstance(d, List):
cls.info("The dictionary given is a list. Typically this list represents multiple services. Iterate over "
"the list to create Service objects for every item available. "
"This method just uses the first item.")
d = d[0]
port = d["port"]
sshobj = None
if "ssh" in d:
sshobj = SSHComponent.from_shodan(d)
httpobj = None
if "http" in d:
httpobj = HTTPComponent.from_shodan(d)
tlsobj = None
if "ssl" in d:
tlsobj = TLSComponent.from_shodan(d)
dnsobj = None
if "dns" in d:
dnsobj = DNSComponent.from_shodan(d)
banner = d["data"]
md5, sha1, sha256, murmur = hash_all(banner.encode("utf-8"))
return Service(
port=port,
banner=d["data"],
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur,
ssh=sshobj,
http=httpobj,
tls=tlsobj,
dns=dnsobj,
timestamp=datetime.fromisoformat(d["timestamp"]),
source="shodan"
)
@classmethod
def from_censys(cls, d: Dict):
"""Creates an instance of this class using a dictionary with typical Censys data."""
port = d["port"]
banner = d.get("banner", None)
md5, sha1, sha256, murmur = None, None, None, None
if banner:
md5, sha1, sha256, murmur = hash_all(banner.encode("utf-8"))
httpobj = None
if "http" in d:
httpobj = HTTPComponent.from_censys(d)
tlsobj = None
if "tls" in d:
tlsobj = TLSComponent.from_censys(d)
sshobj = None
if "ssh" in d:
sshobj = SSHComponent.from_censys(d)
dnsobj = None
if "dns" in d:
dnsobj = DNSComponent.from_censys(d)
timestamp = datetime.fromisoformat(d["observed_at"][:-4]) if "observed_at" in d else None
return Service(
port=port,
banner=banner,
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur,
http=httpobj,
tls=tlsobj,
ssh=sshobj,
dns=dnsobj,
timestamp=timestamp,
source="censys"
)
@classmethod
def from_binaryedge(cls, d: List):
"""Creates an instance of this class using a dictionary with typical BinaryEdge data. Contrary to the other
scanning services, binaryedge provides multiple entries per port."""
port = d[0]["target"]["port"]
type_index: Dict[str, int] = {service["origin"]["type"]: idx for idx, service in enumerate(d)}
httpobj = None
if "webv2" in type_index:
httpobj = HTTPComponent.from_binaryedge(d[type_index["webv2"]])
tlsobj = None
if "ssl-simple" in type_index:
tlsobj = TLSComponent.from_binaryedge(d[type_index["ssl-simple"]])
sshobj = None
if "ssh" in type_index:
sshobj = SSHComponent.from_binaryedge(d[type_index["ssh"]])
banner = None
md5, sha1, sha256, murmur = None, None, None, None
if "service-simple" in type_index:
banner = d[type_index["service-simple"]]["result"]["data"]["service"].get("banner", None)
if banner:
md5, sha1, sha256, murmur = hash_all(banner.encode("utf-8"))
return Service(
port=port,
http=httpobj,
tls=tlsobj,
ssh=sshobj,
banner=banner,
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur,
source="binaryedge"
)