Skip to content

Commit

Permalink
* [e2e] improve terraform fixtures and add more test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lanfon72 committed Nov 14, 2023
1 parent ee12599 commit 7b7a406
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 155 deletions.
153 changes: 83 additions & 70 deletions harvester_e2e_tests/fixtures/terraform.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
import re
import json
import operator
from pathlib import Path
from datetime import datetime
from subprocess import run, PIPE
from dataclasses import dataclass
from dataclasses import dataclass, field

import pytest
from pkg_resources import parse_version

TF_PROVIDER = '''
terraform {{
required_version = "{tf_version}"
required_providers {{
harvester = {{
source = "{provider_source}"
version = "{provider_version}"
}}
}}
}}
provider "harvester" {{
kubeconfig = "{config_path}"
}}
terraform {
required_version = "%(tf_version)s"
required_providers {
harvester = {
source = "%(provider_source)s"
version = "%(provider_version)s"
}
}
}
provider "harvester" {
kubeconfig = "%(config_path)s"
}
'''


Expand Down Expand Up @@ -59,86 +58,94 @@ def tf_executor(tf_script_dir):

@pytest.fixture(scope="session")
def tf_harvester(api_client, tf_script_dir, tf_provider_version, tf_executor):
class TerraformHarvester:
def __init__(self, executor, workdir):
self.executor = executor.resolve()
self.workdir = workdir
self.workdir.mkdir(exist_ok=True)
harv = TerraformHarvester(tf_executor, tf_script_dir / datetime.now().strftime("%Hh%Mm_%m-%d"))
kuebconfig = api_client.generate_kubeconfig()
out, err, exc_code = harv.initial_provider(kuebconfig, tf_provider_version)
assert not err and 0 == exc_code
return harv

def exec_command(self, cmd, raw=False, **kws):
rv = run(cmd, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.workdir, **kws)

if raw:
return rv
return remove_ansicode(rv.stdout), remove_ansicode(rv.stderr), rv.returncode
@pytest.fixture(scope="session")
def tf_resource(tf_provider_version):
converter = Path("./terraform_test_artifacts/json2hcl")
return BaseTerraformResource.from_version(tf_provider_version)(converter)

def execute(self, cmd, raw=False, **kws):
return self.exec_command(f"{self.executor} {cmd}", raw=raw, **kws)

def initial_provider(self, kubeconfig, provider_version):
kubefile = self.workdir / ".kubeconfig"
with open(kubefile, "w") as f:
f.write(kubeconfig)
class TerraformHarvester:
def __init__(self, executor, workdir):
self.executor = executor.resolve()
self.workdir = workdir
self.workdir.mkdir(exist_ok=True)

with open(self.workdir / "provider.tf", "w") as f:
f.write(TF_PROVIDER.format(
tf_version=">=0.13", config_path=kubefile.resolve(),
provider_source="harvester/harvester", provider_version=provider_version
))
def exec_command(self, cmd, raw=False, **kws):
rv = run(cmd, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.workdir, **kws)

return self.execute("init")
if raw:
return rv
return remove_ansicode(rv.stdout), remove_ansicode(rv.stderr), rv.returncode

def save_as(self, content, filename, ext=".tf"):
filepath = self.workdir / f"{filename}{ext}"
with open(filepath, "w") as f:
f.write(content)
def execute(self, cmd, raw=False, **kws):
return self.exec_command(f"{self.executor} {cmd}", raw=raw, **kws)

def apply_resource(self, resource_type, resource_name):
return self.execute(f"apply -auto-approve -target {resource_type}.{resource_name}")
def initial_provider(self, kubeconfig, provider_version):
kubefile = self.workdir / "kubeconfig"
with open(kubefile, "w") as f:
f.write(kubeconfig)

def destroy_resource(self, resource_type, resource_name):
return self.execute(f"destroy -auto-approve -target {resource_type}.{resource_name}")
with open(self.workdir / "provider.tf", "w") as f:
f.write(TF_PROVIDER % dict(
tf_version=">=0.13", config_path=kubefile.resolve(),
provider_source="harvester/harvester", provider_version=provider_version
))

harv = TerraformHarvester(tf_executor, tf_script_dir / datetime.now().strftime("%Hh%Mm_%m-%d"))
kuebconfig = api_client.generate_kubeconfig()
out, err, exc_code = harv.initial_provider(kuebconfig, tf_provider_version)
assert not err and 0 == exc_code
return self.execute("init")

return harv
def save_as(self, content, filename, ext=".tf"):
filepath = self.workdir / f"{filename}{ext}"
with open(filepath, "w") as f:
f.write(content)

def apply_resource(self, resource_type, resource_name):
return self.execute(f"apply -auto-approve -target {resource_type}.{resource_name}")

@pytest.fixture(scope="session")
def tf_resource(tf_provider_version):
converter = Path("./terraform_test_artifacts/json2hcl")
return BaseTerraformResource.from_version(tf_provider_version)(converter)
def destroy_resource(self, resource_type, resource_name):
return self.execute(f"destroy -auto-approve -target {resource_type}.{resource_name}")


@dataclass
class ResourceContext:
type: str
name: str
ctx: str
raw: dict = field(default_factory=dict, compare=False)


class BaseTerraformResource:
#: :type: Tuple[str, callable[(str, str), bool]]
#: Be used to adjust whether the class is support to specific version,
#: this would be used in cls.is_support, worked as `version op target`
support_to = ("0.0.0", operator.eq)
#: Be used to store sub classes of BaseTerraformResource
#: Type: Dict[Type[BaseTerraformResource], List[Type[BaseTerraformResource]]]
_sub_classes = dict()

#: Be used to adjust whether the class is support to specific version
#: Type: str
support_to = "0.0.0"

@classmethod
def is_support(cls, target_version):
version, comparator = cls.support_to
return comparator(parse_version(version), parse_version(target_version))
return parse_version(target_version) >= parse_version(cls.support_to)

@classmethod
def from_version(cls, version):
for c in cls.__subclasses__()[::-1]:
subcls = c.from_version(version)
if subcls.is_support(version):
return subcls
for c in sorted(cls._sub_classes.get(cls, []),
reverse=True, key=lambda x: parse_version(x.support_to).release):
if c.is_support(version):
return c
return cls

def __init_subclass__(cls):
for parent in cls.__mro__:
if issubclass(parent, BaseTerraformResource):
cls._sub_classes.setdefault(parent, []).append(cls)

def __init__(self, converter):
self.executor = Path(converter).resolve()

Expand All @@ -152,17 +159,19 @@ def convert_to_hcl(self, json_spec, raw=False):
out = rv.stdout.decode()
out = re.sub(r'"resource"', "resource", out) # resource should not quote
out = re.sub(r"\"(.+?)\" =", r"\1 =", out) # property should not quote
out = re.sub(r"(.[^ ]+) = {", r"\1 {", out) # block should not have `=`
return out

def make_resource(self, resource_type, resource_name, *, convert=True, **properties):
rv = dict(resource={resource_type: {resource_name: properties}})
if convert:
return ResourceContext(resource_type, resource_name, self.convert_to_hcl(rv))
return ResourceContext(resource_type, resource_name, self.convert_to_hcl(rv), rv)
return rv


class TerraformResource(BaseTerraformResource):
support_to = ("0.0.0", operator.le)
''' https://github.com/harvester/terraform-provider-harvester/blob/v0.1.0/docs/resources/ '''
support_to = "0.1.0"

def ssh_key(self, resource_name, name, public_key, *, convert=True, **properties):
return self.make_resource(
Expand Down Expand Up @@ -207,12 +216,14 @@ def virtual_machine(self, resource_name, name, disks, nics, *, convert=True, **p

def network(self, resource_name, name, vlan_id, *, convert=True, **properties):
return self.make_resource(
"harvester_network", resource_name, vlan_id=vlan_id, convert=convert, **properties
"harvester_network", resource_name, name=name, vlan_id=vlan_id,
convert=convert, **properties
)


class TerraformResource_060(TerraformResource):
support_to = ("0.6.0", operator.le)
''' https://github.com/harvester/terraform-provider-harvester/blob/v0.6.0/docs/resources/ '''
support_to = "0.6.0"

def storage_class(
self, resource_name, name, replicas=1, stale_timeout=30, migratable="true",
Expand All @@ -231,7 +242,7 @@ def storage_class(

def cluster_network(self, resource_name, name, *, convert=True, **properties):
return self.make_resource(
"harvester_clusternetwork", resource_name, convert=convert, **properties
"harvester_clusternetwork", resource_name, name=name, convert=convert, **properties
)

def vlanconfig(
Expand All @@ -255,7 +266,9 @@ def network(


class TerraformResource_063(TerraformResource_060):
support_to = ("0.6.3", operator.le)
''' https://github.com/harvester/terraform-provider-harvester/blob/v0.6.3/docs/resources/ '''

support_to = "0.6.3"

def cloudinit_secret(
self, resource_name, name, user_data="", network_data="", *, convert=True, **properties
Expand Down
82 changes: 0 additions & 82 deletions harvester_e2e_tests/integration/test_terraform.py

This file was deleted.

Loading

0 comments on commit 7b7a406

Please sign in to comment.