Skip to content

Commit

Permalink
[Agent] Respect connector_id defined in agentless (#2989)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb authored Nov 21, 2024
1 parent f393901 commit 4d861cc
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 365 deletions.
56 changes: 34 additions & 22 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
@@ -1,35 +1,47 @@
# This file is for internal experimental purposes only.
# Please do not use this file for any real-world workloads.

FROM docker.elastic.co/elastic-agent/elastic-agent:9.0.0-SNAPSHOT

USER root
# Install dependencies
RUN apt update
RUN apt install software-properties-common -y
RUN add-apt-repository ppa:deadsnakes/ppa
RUN apt install python3.11 python3.11-venv make -y

# TEMPORARY STUFF
# I need vim to edit some fields
# Git is needed to pull connectors repo
# yq is needed to append our input to elastic-agent.yml
RUN add-apt-repository ppa:rmescandon/yq
RUN apt install vim git yq -y

# Copy and install python agent client
# TODO: also package this with revision and everything

# Install apt-get dependencies
RUN apt-get update && apt-get install -y \
software-properties-common \
vim \
wget \
git \
make \
&& add-apt-repository ppa:deadsnakes/ppa \
&& apt-get update && apt-get install -y python3.11 python3.11-venv \
&& apt-get clean && rm -rf /var/lib/apt/lists/*

# Install Go-based yq separately
RUN wget https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 -O /usr/bin/yq && \
chmod +x /usr/bin/yq

# Copy project files
COPY ./ /usr/share/connectors

# Set working directory
WORKDIR /usr/share/connectors

# Install Python agent client
RUN PYTHON=python3.11 make clean install install-agent

# Add component
# Agent directory name is dynamic and based on build hash, so we need to move in two steps
# Copy and move the component files into the dynamic agent directory
COPY ./resources/agent/python-elastic-agent-client /tmp/python-elastic-agent-client
COPY ./resources/agent/python-elastic-agent-client.spec.yml /tmp/python-elastic-agent-client.spec.yml
RUN mv /tmp/python-elastic-agent-client /usr/share/elastic-agent/data/elastic-agent-$(cat /usr/share/elastic-agent/.build_hash.txt| cut -c 1-6)/components/python-elastic-agent-client
RUN mv /tmp/python-elastic-agent-client.spec.yml /usr/share/elastic-agent/data/elastic-agent-$(cat /usr/share/elastic-agent/.build_hash.txt| cut -c 1-6)/components/python-elastic-agent-client.spec.yml

# add input to the elastic-agent.yml
RUN yq eval --inplace '.inputs += { "type": "connectors-py", "id": "connectors-py", "use_output": "default"}' /usr/share/elastic-agent/elastic-agent.yml
RUN BUILD_DIR=$(cat /usr/share/elastic-agent/.build_hash.txt | cut -c 1-6) && \
mv /tmp/python-elastic-agent-client \
/usr/share/elastic-agent/data/elastic-agent-${BUILD_DIR}/components/python-elastic-agent-client && \
mv /tmp/python-elastic-agent-client.spec.yml \
/usr/share/elastic-agent/data/elastic-agent-${BUILD_DIR}/components/python-elastic-agent-client.spec.yml

WORKDIR /usr/share/elastic-agent
# Modify the elastic-agent.yml file
RUN yq eval --inplace '.inputs += { "type": "connectors-py", "id": "connectors-py", "use_output": "default"}' \
/usr/share/elastic-agent/elastic-agent.yml

# Set the final working directory
WORKDIR /usr/share/elastic-agent
1 change: 1 addition & 0 deletions connectors/agent/connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None
connector_id=connector_id,
service_type=service_type,
connector_name=connector_name,
is_native=True,
)
logger.info(f"Created connector record for {connector_id}")
except Exception as e:
Expand Down
11 changes: 10 additions & 1 deletion connectors/agent/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,16 @@ def _extract_unit_config_value(unit, field_name):
connector_name = _extract_unit_config_value(
connector_input, "connector_name"
)
connector_id = _extract_unit_config_value(connector_input, "id")

connector_id = _extract_unit_config_value(
connector_input, "connector_id"
)
# If "connector_id" is not explicitly provided as a policy parameter,
# use the "id" from the fleet policy as a fallback for the connector ID.
# The connector ID must be encoded in the policy to associate the integration
# with the connector being managed by the policy.
if not connector_id:
connector_id = _extract_unit_config_value(connector_input, "id")

logger.info(
f"Connector input found. Service type: {service_type}, Connector ID: {connector_id}, Connector Name: {connector_name}"
Expand Down
3 changes: 2 additions & 1 deletion connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def connector_check_in(self, connector_id):
)

async def connector_put(
self, connector_id, service_type, connector_name, index_name
self, connector_id, service_type, connector_name, index_name, is_native
):
return await self._retrier.execute_with_retry(
partial(
Expand All @@ -108,6 +108,7 @@ async def connector_put(
service_type=service_type,
name=connector_name,
index_name=index_name,
is_native=is_native,
)
)

Expand Down
9 changes: 7 additions & 2 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,19 @@ async def heartbeat(self, doc_id):
await self.update(doc_id=doc_id, doc={"last_seen": iso_utc()})

async def connector_put(
self, connector_id, service_type, connector_name=None, index_name=None
self,
connector_id,
service_type,
connector_name=None,
index_name=None,
is_native=False,
):
await self.api.connector_put(
connector_id=connector_id,
service_type=service_type,
connector_name=connector_name,
index_name=index_name,
is_native=is_native,
)

async def connector_exists(self, connector_id):
Expand Down Expand Up @@ -231,7 +237,6 @@ async def supported_connectors(self, native_service_types=None, connector_ids=No
custom_connectors_query = {
"bool": {
"filter": [
{"term": {"is_native": False}},
{"terms": {"_id": connector_ids}},
]
}
Expand Down
2 changes: 1 addition & 1 deletion connectors/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class BaseDataSource:
advanced_rules_enabled = False
dls_enabled = False
incremental_sync_enabled = False
native_connector_api_keys_enabled = True
native_connector_api_keys_enabled = False

def __init__(self, configuration):
# Initialize to the global logger
Expand Down
37 changes: 0 additions & 37 deletions connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
from elasticsearch import (
AuthorizationException as ElasticAuthorizationException,
)
from elasticsearch import NotFoundError as ElasticNotFoundError

from connectors.config import DataSourceFrameworkConfig
from connectors.es.client import License, with_concurrency_control
from connectors.es.index import DocumentNotFoundError
from connectors.es.license import requires_platinum_license
from connectors.es.management_client import ESManagementClient
from connectors.es.sink import (
CREATES_QUEUED,
DELETES_QUEUED,
Expand Down Expand Up @@ -173,14 +171,6 @@ async def execute(self):
bulk_options = self.bulk_options.copy()
self.data_provider.tweak_bulk_options(bulk_options)

if (
self.connector.native
and self.connector.features.native_connector_api_keys_enabled()
and self.service_config.get("_use_native_connector_api_keys", True)
):
# Update the config so native connectors can use API key authentication during sync
await self._update_native_connector_authentication()

self.sync_orchestrator = SyncOrchestrator(
self.es_config, self.sync_job.logger
)
Expand Down Expand Up @@ -227,33 +217,6 @@ async def execute(self):
if self.data_provider is not None:
await self.data_provider.close()

async def _update_native_connector_authentication(self):
"""
The connector secrets API endpoint can only be accessed by the Enterprise Search system role,
so we need to use a client initialised with the config's username and password to first fetch
the API key for Elastic managed connectors.
After that, we can provide the API key to the sync orchestrator to initialise a new client
so that an API key can be used for the sync.
This function should not be run for connector clients.
"""
es_management_client = ESManagementClient(self.es_config)
try:
self.sync_job.log_debug(
f"Checking secrets storage for API key for index [{self.connector.index_name}]..."
)
api_key = await es_management_client.get_connector_secret(
self.connector.api_key_secret_id
)
self.sync_job.log_debug(
f"API key found in secrets storage for index [{self.connector.index_name}], will use this for authorization."
)
self.es_config["api_key"] = api_key
except ElasticNotFoundError as e:
msg = f"API key not found in secrets storage for index [{self.connector.index_name}]."
raise ApiKeyNotFoundError(msg) from e
finally:
await es_management_client.close()

def _data_source_framework_config(self):
builder = DataSourceFrameworkConfig.Builder().with_max_file_size(
self.service_config.get("max_file_download_size")
Expand Down
1 change: 1 addition & 0 deletions tests/agent/test_connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def test_ensure_connector_records_exist_creates_connectors_if_not_exist(
connector_id="1",
service_type="service1",
connector_name=f"[Elastic-managed] service1 connector {random_connector_name_id}",
is_native=True,
)


Expand Down
Loading

0 comments on commit 4d861cc

Please sign in to comment.