This repository has been archived by the owner on Feb 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement click custom options/arguments/commands
- Loading branch information
1 parent
4ede698
commit 24b6a9a
Showing
2 changed files
with
183 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
""" | ||
Custom defined Click commands/options/etc. | ||
""" | ||
|
||
import click | ||
|
||
from griffon.exceptions import GriffonUsageError | ||
from griffon.helpers import Style | ||
|
||
|
||
class ListParamType(click.ParamType): | ||
"""Custom comma-separated list type""" | ||
|
||
name = "list" | ||
|
||
def convert(self, value, param, ctx): | ||
if value is None: | ||
return [] | ||
return value.split(",") | ||
|
||
|
||
class BaseGroupParameter: | ||
""" | ||
Custom base parameter which handles: | ||
* mutually exclusive options | ||
* required group options (one of the group is required) | ||
""" | ||
|
||
def handle_parse_result(self, ctx, opts, args): | ||
if self.mutually_exclusive_group: | ||
if opts.get(self.name) is None: | ||
pass # skip check for not supplied click.Arguments | ||
elif self.name in opts and any( | ||
opt in opts for opt in self.mutually_exclusive_group if opts.get(opt) is not None | ||
): | ||
raise GriffonUsageError( | ||
( | ||
f"{Style.BOLD}{self.name} cannot be used with " | ||
f"{', '.join(self.mutually_exclusive_group)}.{Style.RESET}" | ||
), | ||
ctx=ctx, | ||
) | ||
|
||
if self.required_group: | ||
group_set = set( | ||
opt for opt in opts if opt in self.required_group and opts.get(opt) is not None | ||
) | ||
if not any(group_set): | ||
raise GriffonUsageError( | ||
f"{Style.BOLD}At least one of {', '.join(self.required_group)} " | ||
f"is required.{Style.RESET}", | ||
ctx=ctx, | ||
) | ||
|
||
return super().handle_parse_result(ctx, opts, args) | ||
|
||
|
||
class GroupOption(BaseGroupParameter, click.Option): | ||
"""Custom Option with BaseGroupParameter functionality""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
self.mutually_exclusive_group = set(kwargs.pop("mutually_exclusive_group", [])) | ||
self.required_group = set(kwargs.pop("required_group", [])) | ||
|
||
if self.mutually_exclusive_group: | ||
mutually_exclusive_str = ", ".join(self.mutually_exclusive_group) | ||
kwargs["help"] = kwargs.get("help", "") + ( | ||
f", this argument is mutually exclusive " | ||
f"with arguments: {Style.BOLD}[{mutually_exclusive_str}]{Style.RESET}" | ||
) | ||
|
||
if self.required_group: | ||
required_str = ", ".join(self.required_group) | ||
kwargs["help"] = kwargs.get("help", "") + ( | ||
f", at least one of these arguments: " | ||
f"{Style.BOLD}[{required_str}]{Style.RESET} is required." | ||
) | ||
|
||
super().__init__(*args, **kwargs) | ||
|
||
|
||
class GroupArgument(BaseGroupParameter, click.Argument): | ||
"""Custom Argument with BaseGroupParameter functionality""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
self.mutually_exclusive_group = set(kwargs.pop("mutually_exclusive_group", [])) | ||
self.required_group = set(kwargs.pop("required_group", [])) | ||
|
||
super().__init__(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
get_product_stream_ofuris, | ||
get_product_version_names, | ||
) | ||
from griffon.commands.custom_commands import GroupArgument, GroupOption | ||
from griffon.commands.entities.corgi import ( | ||
get_component_manifest, | ||
get_component_summary, | ||
|
@@ -157,10 +158,17 @@ def retrieve_component_summary(ctx, component_name, strict_name_search): | |
) | ||
@click.argument( | ||
"component_name", | ||
cls=GroupArgument, | ||
required=False, | ||
required_group=["component_name", "purl"], | ||
mutually_exclusive_group=["purl"], | ||
) | ||
@click.option( | ||
"--purl", help="Component purl, needs to be in quotes (ex. 'pkg:rpm/[email protected]')" | ||
"--purl", | ||
cls=GroupOption, | ||
help="Component purl, needs to be in quotes (ex. 'pkg:rpm/[email protected]')", | ||
required_group=["component_name", "purl"], | ||
mutually_exclusive_group=["component_name"], | ||
) | ||
@click.option( | ||
"--arch", | ||
|
@@ -361,11 +369,6 @@ def get_product_contain_component( | |
"""List products of a latest component.""" | ||
if verbose: | ||
ctx.obj["VERBOSE"] = verbose | ||
if not purl and not component_name: | ||
click.echo(ctx.get_help()) | ||
click.echo("") | ||
click.echo(f"{Style.BOLD}Must supply Component name or --purl.{Style.RESET}") | ||
exit(0) | ||
|
||
if ( | ||
not search_latest | ||
|
@@ -592,8 +595,19 @@ def get_product_contain_component( | |
name="components-contain-component", | ||
help="List Components containing Component.", | ||
) | ||
@click.argument("component_name", required=False) | ||
@click.option("--purl") | ||
@click.argument( | ||
"component_name", | ||
cls=GroupArgument, | ||
required=False, | ||
required_group=["component_name", "purl"], | ||
mutually_exclusive_group=["purl"], | ||
) | ||
@click.option( | ||
"--purl", | ||
cls=GroupOption, | ||
required_group=["component_name", "purl"], | ||
mutually_exclusive_group=["component_name"], | ||
) | ||
@click.option("--type", "component_type", type=click.Choice(CorgiService.get_component_types())) | ||
@click.option("--version", "component_version") | ||
@click.option( | ||
|
@@ -633,9 +647,6 @@ def get_component_contain_component( | |
"""List components that contain component.""" | ||
if verbose: | ||
ctx.obj["VERBOSE"] = verbose | ||
if not component_name and not purl: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
if component_name: | ||
q = query_service.invoke(core_queries.components_containing_component_query, ctx.params) | ||
cprint(q, ctx=ctx) | ||
|
@@ -650,8 +661,23 @@ def get_component_contain_component( | |
name="product-manifest", | ||
help="Get Product manifest (includes Root Components and all dependencies).", | ||
) | ||
@click.argument("product_stream_name", required=False, shell_complete=get_product_stream_names) | ||
@click.option("--ofuri", "ofuri", type=click.STRING, shell_complete=get_product_stream_ofuris) | ||
@click.argument( | ||
"product_stream_name", | ||
cls=GroupArgument, | ||
required=False, | ||
shell_complete=get_product_stream_names, | ||
required_group=["ofuri", "product_stream_name"], | ||
mutually_exclusive_group=["ofuri"], | ||
) | ||
@click.option( | ||
"--ofuri", | ||
"ofuri", | ||
cls=GroupOption, | ||
type=click.STRING, | ||
shell_complete=get_product_stream_ofuris, | ||
required_group=["ofuri", "product_stream_name"], | ||
mutually_exclusive_group=["product_stream_name"], | ||
) | ||
@click.option( | ||
"--spdx-json", | ||
"spdx_json_format", | ||
|
@@ -662,10 +688,6 @@ def get_component_contain_component( | |
@click.pass_context | ||
def get_product_manifest_query(ctx, product_stream_name, ofuri, spdx_json_format): | ||
"""List components of a specific product version.""" | ||
if not ofuri and not product_stream_name: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
|
||
if spdx_json_format: | ||
ctx.ensure_object(dict) | ||
ctx.obj["FORMAT"] = "json" # TODO - investigate if we need yaml format. | ||
|
@@ -682,8 +704,23 @@ def get_product_manifest_query(ctx, product_stream_name, ofuri, spdx_json_format | |
name="product-components", | ||
help="List LATEST Root Components of Product.", | ||
) | ||
@click.argument("product_stream_name", required=False, shell_complete=get_product_stream_names) | ||
@click.option("--ofuri", "ofuri", type=click.STRING, shell_complete=get_product_stream_ofuris) | ||
@click.argument( | ||
"product_stream_name", | ||
cls=GroupArgument, | ||
required=False, | ||
shell_complete=get_product_stream_names, | ||
required_group=["ofuri", "product_stream_name"], | ||
mutually_exclusive_group=["ofuri"], | ||
) | ||
@click.option( | ||
"--ofuri", | ||
"ofuri", | ||
cls=GroupOption, | ||
type=click.STRING, | ||
shell_complete=get_product_stream_ofuris, | ||
required_group=["ofuri", "product_stream_name"], | ||
mutually_exclusive_group=["product_stream_name"], | ||
) | ||
@query_params_options( | ||
entity="Component", | ||
endpoint_module=v1_components_list, | ||
|
@@ -704,9 +741,6 @@ def get_product_latest_components_query(ctx, product_stream_name, ofuri, verbose | |
if verbose: | ||
ctx.obj["VERBOSE"] = verbose | ||
ctx.params.pop("verbose") | ||
if not ofuri and not product_stream_name: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
if ofuri: | ||
params["ofuri"] = ofuri | ||
if product_stream_name: | ||
|
@@ -724,8 +758,20 @@ def get_product_latest_components_query(ctx, product_stream_name, ofuri, verbose | |
name="component-manifest", | ||
help="Get Component manifest.", | ||
) | ||
@click.option("--uuid", "component_uuid") | ||
@click.option("--purl", help="Component Purl (must be quoted).") | ||
@click.option( | ||
"--uuid", | ||
"component_uuid", | ||
cls=GroupOption, | ||
required_group=["component_uuid", "purl"], | ||
mutually_exclusive_group=["purl"], | ||
) | ||
@click.option( | ||
"--purl", | ||
cls=GroupOption, | ||
required_group=["component_uuid", "purl"], | ||
mutually_exclusive_group=["component_uuid"], | ||
help="Component Purl (must be quoted).", | ||
) | ||
@click.option( | ||
"--spdx-json", | ||
"spdx_json_format", | ||
|
@@ -736,9 +782,6 @@ def get_product_latest_components_query(ctx, product_stream_name, ofuri, verbose | |
@click.pass_context | ||
def retrieve_component_manifest(ctx, component_uuid, purl, spdx_json_format): | ||
"""Retrieve Component manifest.""" | ||
if not component_uuid and not purl: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
if spdx_json_format: | ||
ctx.ensure_object(dict) | ||
ctx.obj["FORMAT"] = "json" | ||
|
@@ -754,7 +797,7 @@ def retrieve_component_manifest(ctx, component_uuid, purl, spdx_json_format): | |
name="components-affected-by-flaw", | ||
help="List Components affected by Flaw.", | ||
) | ||
@click.argument("cve_id", required=False, type=click.STRING, shell_complete=get_cve_ids) | ||
@click.argument("cve_id", required=True, type=click.STRING, shell_complete=get_cve_ids) | ||
@click.option( | ||
"--affectedness", | ||
help="Filter by Affect affectedness.", | ||
|
@@ -789,9 +832,6 @@ def components_affected_by_specific_cve_query( | |
ctx, cve_id, affectedness, affect_resolution, affect_impact, component_type, namespace | ||
): | ||
"""List components affected by specific CVE.""" | ||
if not cve_id: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
q = query_service.invoke(core_queries.components_affected_by_specific_cve_query, ctx.params) | ||
cprint(q, ctx=ctx) | ||
|
||
|
@@ -800,23 +840,31 @@ def components_affected_by_specific_cve_query( | |
name="products-affected-by-flaw", | ||
help="List Products affected by Flaw.", | ||
) | ||
@click.argument("cve_id", required=False, type=click.STRING, shell_complete=get_cve_ids) | ||
@click.argument("cve_id", required=True, type=click.STRING, shell_complete=get_cve_ids) | ||
@click.pass_context | ||
@progress_bar | ||
def product_versions_affected_by_cve_query(ctx, cve_id): | ||
"""List Products affected by a CVE.""" | ||
if not cve_id: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
q = query_service.invoke( | ||
core_queries.products_versions_affected_by_specific_cve_query, ctx.params | ||
) | ||
cprint(q, ctx=ctx) | ||
|
||
|
||
@queries_grp.command(name="component-flaws", help="List Flaws affecting a Component.") | ||
@click.argument("component_name", required=False) | ||
@click.option("--purl") | ||
@click.argument( | ||
"component_name", | ||
cls=GroupArgument, | ||
required=False, | ||
required_group=["component_name", "purl"], | ||
mutually_exclusive_group=["purl"], | ||
) | ||
@click.option( | ||
"--purl", | ||
cls=GroupOption, | ||
required_group=["component_name", "purl"], | ||
mutually_exclusive_group=["component_name"], | ||
) | ||
@click.option( | ||
"--flaw-impact", | ||
"flaw_impact", | ||
|
@@ -867,10 +915,6 @@ def cves_for_specific_component_query( | |
strict_name_search, | ||
): | ||
"""List flaws of a specific component.""" | ||
if not purl and not component_name: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
|
||
q = query_service.invoke(core_queries.cves_for_specific_component_query, ctx.params) | ||
cprint(q, ctx=ctx) | ||
|
||
|
@@ -883,9 +927,17 @@ def cves_for_specific_component_query( | |
"product_version_name", | ||
required=False, | ||
type=click.STRING, | ||
cls=GroupArgument, | ||
shell_complete=get_product_version_names, | ||
required_group=["ofuri", "product_version_name"], | ||
mutually_exclusive_group=["ofuri"], | ||
) | ||
@click.option( | ||
"--ofuri", | ||
cls=GroupOption, | ||
required_group=["ofuri", "product_version_name"], | ||
mutually_exclusive_group=["product_version_name"], | ||
) | ||
@click.option("--ofuri") | ||
@click.option( | ||
"--flaw-impact", | ||
"flaw_impact", | ||
|
@@ -936,8 +988,5 @@ def cves_for_specific_product_query( | |
strict_name_search, | ||
): | ||
"""List flaws of a specific product.""" | ||
if not product_version_name and not ofuri: | ||
click.echo(ctx.get_help()) | ||
exit(0) | ||
q = query_service.invoke(core_queries.cves_for_specific_product_query, ctx.params) | ||
cprint(q, ctx=ctx) |