Skip to content

Commit

Permalink
feat(agent): Add basic support for EKS Pods on Fargate
Browse files Browse the repository at this point in the history
Signed-off-by: Ferenc Géczi <[email protected]>
  • Loading branch information
Ferenc- committed Nov 17, 2023
1 parent 7d8a789 commit ab91cee
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 0 deletions.
96 changes: 96 additions & 0 deletions instana/agent/aws_eks_fargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# (c) Copyright IBM Corp. 2023

"""
The Instana agent (for AWS EKS Fargate) that manages
monitoring state and reporting that data.
"""
import os
import time
from instana.options import AWSFargateOptions
from instana.collector.aws_fargate import AWSFargateCollector
from instana.collector.helpers.eks import get_pod_name
from instana.log import logger
from instana.util import to_json
from instana.agent.base import BaseAgent
from instana.version import VERSION


class EKSFargateAgent(BaseAgent):
""" In-process agent for AWS Fargate """
def __init__(self):
super(AWSFargateAgent, self).__init__()

self.options = AWSFargateOptions()
self.collector = None
self.report_headers = None
self._can_send = False

# Update log level (if INSTANA_LOG_LEVEL was set)
self.update_log_level()

logger.info("Stan is on the AWS Fargate scene. Starting Instana instrumentation version: %s", VERSION)

if self._validate_options():
self._can_send = True
self.collector = AWSFargateCollector(self)
self.collector.start()
else:
logger.warning("Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. "
"We will not be able monitor this AWS Fargate cluster.")

self.podname = get_pod_name()

def can_send(self):
"""
Are we in a state where we can send data?
@return: Boolean
"""
return self._can_send

def get_from_structure(self):
"""
Retrieves the From data that is reported alongside monitoring data.
@return: dict()
"""

return {'hl': True, 'cp': 'k8s', 'e': self.podname}

def report_data_payload(self, payload):
"""
Used to report metrics and span data to the endpoint URL in self.options.endpoint_url
"""
response = None
try:
if self.report_headers is None:
# Prepare request headers
self.report_headers = dict()
self.report_headers["Content-Type"] = "application/json"
self.report_headers["X-Instana-Host"] = self.collector.get_fq_arn()
self.report_headers["X-Instana-Key"] = self.options.agent_key

self.report_headers["X-Instana-Time"] = str(round(time.time() * 1000))

response = self.client.post(self.__data_bundle_url(),
data=to_json(payload),
headers=self.report_headers,
timeout=self.options.timeout,
verify=self.options.ssl_verify,
proxies=self.options.endpoint_proxy)

if not 200 <= response.status_code < 300:
logger.info("report_data_payload: Instana responded with status code %s", response.status_code)
except Exception as exc:
logger.debug("report_data_payload: connection error (%s)", type(exc))
return response

def _validate_options(self):
"""
Validate that the options used by this Agent are valid. e.g. can we report data?
"""
return self.options.endpoint_url is not None and self.options.agent_key is not None

def __data_bundle_url(self):
"""
URL for posting metrics to the host agent. Only valid when announced.
"""
return "%s/bundle" % self.options.endpoint_url
59 changes: 59 additions & 0 deletions instana/collector/aws_eks_fargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# (c) Copyright IBM Corp. 2023

"""
Collector for EKS Pods on AWS Fargate: Manages the periodic collection of metrics & snapshot data
"""
from instana.log import logger
from instan.agentbase import BaseCollector
from instana.util import DictionaryOfStan


class EKSFargateCollector(BaseCollector):
""" Collector for EKS Pods on AWS Fargate """

def __init__(self, agent):
super(EKSFargateCollector, self).__init__(agent)
logger.debug("Loading Collector for EKS Pods on AWS Fargate ")

self.snapshot_data = DictionaryOfStan()
self.snapshot_data_sent = False

def should_send_snapshot_data(self):
return int(time()) - self.snapshot_data_last_sent > self.snapshot_data_interval

def collect_snapshot(self, event, context):
self.context = context
self.event = event

try:
plugin_data = dict()
plugin_data["name"] = "com.instana.plugin.aws.eks"
plugin_data["entityId"] = self.get_fq_arn()
self.snapshot_data["plugins"] = [plugin_data]
except Exception:
logger.debug("collect_snapshot error", exc_info=True)
return self.snapshot_data

def prepare_payload(self):
payload = DictionaryOfStan()
payload["spans"] = []
payload["metrics"]["plugins"] = []

try:
if not self.span_queue.empty():
payload["spans"] = self.queued_spans()

with_snapshot = self.should_send_snapshot_data()

plugins = []
for helper in self.helpers:
plugins.extend(helper.collect_metrics(with_snapshot=with_snapshot))

payload["metrics"]["plugins"] = plugins

if with_snapshot is True:
self.snapshot_data_last_sent = int(time())
except Exception:
logger.debug("collect_snapshot error", exc_info=True)

return payload
Empty file.
32 changes: 32 additions & 0 deletions instana/collector/helpers/eks/pod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# (c) Copyright IBM Corp. 2023

""" Module to handle the collection of container metrics for EKS Pods on AWS Fargate """

def get_pod_name():
podname = os.environ.get('HOSTNAME')

if not podname:
logger.warning("Failed to determine podname from EKS hostname.")
return podname

# TODO: Remove this if still unused for correlation
def get_pod_uuid():
pod_uuid = None
parent_proc_cpuset = '/proc/1/cpuset'
if os.path.isfile(parent_proc_cpuset):
match = None
uuid_capture_regex = '.*[/]kubepods[/].*[/]pod(?P<UUID>.*)[/].*'
with open('/proc/1/cpuset', 'r') as file:
data = file.read().replace('\n', '')
match = re.search(uuid_capture_regex, data)
if match:
pod_uuid = match.group('UUID')
else:
logger.warning("Failed to determine Pod UUID on EKS because the file '%s' has a content"
" that doesn't match the expected pattern: '%s'",
parent_proc_cpuset, uuid_capture_regex)
else:
logger.warning("Failed to determine Pod UUID on EKS because the file '%s' does not exist.",
parent_proc_cpuset)

return pod_uuid
4 changes: 4 additions & 0 deletions instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ def __init__(self, **kwds):

self.zone = os.environ.get("INSTANA_ZONE", None)

class EKSFargateOptions(AWSFargateOptions):
""" Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate. """
def __init__(self, **kwds):
super(EKSFargateOptions, self).__init__()

class GCROptions(ServerlessOptions):
""" Options class for Google Cloud Run. Holds settings specific to Google Cloud Run. """
Expand Down
14 changes: 14 additions & 0 deletions instana/singletons.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# (c) Copyright Instana Inc. 2018

import os
import re
import subprocess
import sys
import opentracing

Expand All @@ -17,9 +19,12 @@

# Detect the environment where we are running ahead of time
aws_env = os.environ.get("AWS_EXECUTION_ENV", "")
uname_output = subprocess.run(['uname -r'], shell=True, stdout=subprocess.PIPE).stdout
aws_kernel = re.match('.+[.]amzn[0-9]+[.].+', uname_output.decode('ascii') if uname_output else '')
env_is_test = "INSTANA_TEST" in os.environ
env_is_aws_fargate = aws_env == "AWS_ECS_FARGATE"
env_is_aws_lambda = "AWS_Lambda_" in aws_env
env_is_kubernetes = "KUBERNETES_PORT" in os.environ
k_service = os.environ.get("K_SERVICE")
k_configuration = os.environ.get("K_CONFIGURATION")
k_revision = os.environ.get("K_REVISION")
Expand Down Expand Up @@ -53,6 +58,15 @@

agent = GCRAgent(service=k_service, configuration=k_configuration, revision=k_revision)
span_recorder = StanRecorder(agent)

# If this is a kubernetes environment and the kernel is from Amazon Linux,
# then assume that this process is running in an EKS Pod on Fargate.
elif env_is_kubernetes and aws_kernel:
from .agent.aws_eks_fargate import EKSFargateAgent
from .recorder import StanRecorder

agent = EKSFargateAgent()
span_recorder = StanRecorder(agent)
else:
from .agent.host import HostAgent
from .recorder import StanRecorder
Expand Down

0 comments on commit ab91cee

Please sign in to comment.