Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Add namespace permission grants
Browse files Browse the repository at this point in the history
Usage follows the description here: streamnative#8

```
resource "pulsar_namespace" "test" {
  ...
  permission_grant {
    role = "my-consumer"
    actions = ["consume"]
  }
}
```
  • Loading branch information
ypt committed Dec 21, 2020
1 parent d8a1b61 commit a9a996a
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 11 deletions.
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ Requirements
Installation
------------

* Clone this repository and cd into the directory
* Run `make build`, it will out a file named `terraform-provider-pulsar`
* Copy this `terraform-provider-pulsar` bin file to your [terraform plugin directory][third-party-plugins]
* Clone this repository and cd into the directory
* Run `make build`, it will out a file named `terraform-provider-pulsar`
* Copy this `terraform-provider-pulsar` bin file to your [terraform plugin directory][third-party-plugins]
* Typically this plugin directory is `~/.terraform.d/plugins/`
* On Linux based 64-bit devices, this directory can be `~/.terraform.d/plugins/linux_amd64`

Expand Down Expand Up @@ -206,19 +206,24 @@ resource "pulsar_namespace" "test" {
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"
}
}
backlog_quota {
limit_bytes = "10000000000"
policy = "consumer_backlog_eviction"
}
persistence_policies {
bookkeeper_ensemble = 1 // Number of bookies to use for a topic, default: 0
bookkeeper_write_quorum = 1 // How many writes to make of each entry, default: 0
bookkeeper_ack_quorum = 1 // Number of acks (guaranteed copies) to wait for each entry, default: 0
managed_ledger_max_mark_delete_rate = 0.0 // Throttling rate of mark-delete operation (0 means no throttle), default: 0.0
}
permission_grant {
role = "some-role"
actions = ["produce", "consume", "functions"]
}
}
```

Expand All @@ -234,6 +239,7 @@ resource "pulsar_namespace" "test" {
| `retention_policies` | Data retention policies | No |
| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |

### `pulsar_topic`

Expand Down
117 changes: 114 additions & 3 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/hashicorp/go-multierror"

"github.com/streamnative/pulsarctl/pkg/pulsar/common"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/streamnative/terraform-provider-pulsar/types"

Expand Down Expand Up @@ -197,6 +198,28 @@ func resourcePulsarNamespace() *schema.Resource {
},
Set: persistencePoliciesToHash,
},
"permission_grant": {
Type: schema.TypeList,
Optional: true,
MinItems: 0,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"role": {
Type: schema.TypeString,
Required: true,
},
"actions": {
Type: schema.TypeSet,
Required: true,
MinItems: 1,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validateAuthAction,
},
},
},
},
},
},
}
}
Expand Down Expand Up @@ -225,7 +248,7 @@ func resourcePulsarNamespaceCreate(d *schema.ResourceData, meta interface{}) err
return fmt.Errorf("ERROR_CREATE_NAMESPACE: %w", err)
}

if err = resourcePulsarNamespaceUpdate(d, meta); err != nil {
if err := resourcePulsarNamespaceUpdate(d, meta); err != nil {
return fmt.Errorf("ERROR_CREATE_NAMESPACE_CONFIG: %w", err)
}

Expand Down Expand Up @@ -314,8 +337,8 @@ func resourcePulsarNamespaceRead(d *schema.ResourceData, meta interface{}) error

_ = d.Set("persistence_policies", schema.NewSet(retentionPoliciesToHash, []interface{}{
map[string]interface{}{
"retention_minutes": string(ret.RetentionTimeInMinutes),
"retention_size_in_mb": string(ret.RetentionSizeInMB),
"retention_minutes": fmt.Sprint(ret.RetentionTimeInMinutes),
"retention_size_in_mb": fmt.Sprint(ret.RetentionSizeInMB),
},
}))
}
Expand Down Expand Up @@ -349,6 +372,27 @@ func resourcePulsarNamespaceRead(d *schema.ResourceData, meta interface{}) error
}))
}

if permissionGrantCfg, ok := d.GetOk("permission_grant"); ok && len(permissionGrantCfg.([]interface{})) > 0 {
grants, err := client.GetNamespacePermissions(*ns)
if err != nil {
return fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespacePermissions: %w", err)
}

permissionGrants := []interface{}{}
for role, roleActions := range grants {
actions := []string{}
for _, action := range roleActions {
actions = append(actions, action.String())
}
permissionGrants = append(permissionGrants, map[string]interface{}{
"role": role,
"actions": actions,
})
}

_ = d.Set("permission_grant", schema.NewSet(permissionGrantToHash, permissionGrants))
}

return nil
}

Expand All @@ -363,6 +407,7 @@ func resourcePulsarNamespaceUpdate(d *schema.ResourceData, meta interface{}) err
backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set)
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set)
permissionGrantConfig := d.Get("permission_grant").([]interface{})

nsName, err := utils.GetNameSpaceName(tenant, namespace)
if err != nil {
Expand Down Expand Up @@ -441,6 +486,37 @@ func resourcePulsarNamespaceUpdate(d *schema.ResourceData, meta interface{}) err
}
}

if d.HasChange("permission_grant") {
permissionGrants, err := unmarshalPermissionGrants(permissionGrantConfig)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("unmarshalPermissionGrants: %w", err))
} else {
for _, grant := range permissionGrants {
if err = client.GrantNamespacePermission(*nsName, grant.Role, grant.Actions); err != nil {
errs = multierror.Append(errs, fmt.Errorf("GrantNamespacePermission: %w", err))
}
}

// Revoke permissions for roles removed from the set
oldPermissionGrants, _ := d.GetChange("permission_grant")
for _, oldGrant := range oldPermissionGrants.([]interface{}) {
oldRole := oldGrant.(map[string]interface{})["role"].(string)
found := false
for _, newGrant := range permissionGrants {
if newGrant.Role == oldRole {
found = true
break
}
}
if !found {
if err = client.RevokeNamespacePermission(*nsName, oldRole); err != nil {
errs = multierror.Append(errs, fmt.Errorf("RevokeNamespacePermission: %w", err))
}
}
}
}
}

if errs != nil {
return fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs)
}
Expand Down Expand Up @@ -469,6 +545,7 @@ func resourcePulsarNamespaceDelete(d *schema.ResourceData, meta interface{}) err
_ = d.Set("backlog_quota", nil)
_ = d.Set("dispatch_rate", nil)
_ = d.Set("persistence_policies", nil)
_ = d.Set("permission_grant", nil)

return nil
}
Expand Down Expand Up @@ -551,6 +628,16 @@ func persistencePoliciesToHash(v interface{}) int {
return hashcode.String(buf.String())
}

func permissionGrantToHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

buf.WriteString(fmt.Sprintf("%s-", m["role"].(string)))
buf.WriteString(fmt.Sprintf("%s-", m["actions"].([]string)))

return hashcode.String(buf.String())
}

func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate {
var dispatchRate utils.DispatchRate

Expand Down Expand Up @@ -646,3 +733,27 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies {

return &persPolicies
}

func unmarshalPermissionGrants(v []interface{}) ([]*types.PermissionGrant, error) {
permissionGrants := make([]*types.PermissionGrant, 0, len(v))
for _, grant := range v {
data := grant.(map[string]interface{})

var permissionGrant types.PermissionGrant
permissionGrant.Role = data["role"].(string)

var actions []common.AuthAction
for _, action := range data["actions"].(*schema.Set).List() {
authAction, err := common.ParseAuthAction(action.(string))
if err != nil {
return nil, fmt.Errorf("ERROR_INVALID_AUTH_ACTION: %w", err)
}
actions = append(actions, authAction)
}
permissionGrant.Actions = actions

permissionGrants = append(permissionGrants, &permissionGrant)
}

return permissionGrants, nil
}
Loading

0 comments on commit a9a996a

Please sign in to comment.