Skip to content

Commit

Permalink
Add namespace permission grants (#23)
Browse files Browse the repository at this point in the history
Usage follows the description here: #8

```
resource "pulsar_namespace" "test" {
  ...
  permission_grant {
    role = "my-consumer"
    actions = ["consume"]
  }
}
```
  • Loading branch information
ypt authored Jan 5, 2021
1 parent 3bd77d9 commit 3434003
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/acctest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
export PATH=$PATH:$(pwd)/bin
wget https://releases.hashicorp.com/terraform/0.12.17/terraform_0.12.17_linux_amd64.zip
unzip terraform_0.12.17_linux_amd64.zip && sudo mv terraform /usr/local/bin
docker run -d -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data apachepulsar/pulsar:2.4.1 bin/pulsar standalone
docker run -d -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data apachepulsar/pulsar:2.7.0 bin/pulsar standalone
mkdir -p $HOME/.terraform.d/plugins/linux_amd64
sleep 10
Expand Down
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ Requirements
Installation
------------

* Clone this repository and cd into the directory
* Run `make build`, it will out a file named `terraform-provider-pulsar`
* Copy this `terraform-provider-pulsar` bin file to your [terraform plugin directory][third-party-plugins]
* Clone this repository and cd into the directory
* Run `make build`, it will out a file named `terraform-provider-pulsar`
* Copy this `terraform-provider-pulsar` bin file to your [terraform plugin directory][third-party-plugins]
* Typically this plugin directory is `~/.terraform.d/plugins/`
* On Linux based 64-bit devices, this directory can be `~/.terraform.d/plugins/linux_amd64`

Expand Down Expand Up @@ -206,19 +206,24 @@ resource "pulsar_namespace" "test" {
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"
}
}
backlog_quota {
limit_bytes = "10000000000"
policy = "consumer_backlog_eviction"
}
persistence_policies {
bookkeeper_ensemble = 1 // Number of bookies to use for a topic, default: 0
bookkeeper_write_quorum = 1 // How many writes to make of each entry, default: 0
bookkeeper_ack_quorum = 1 // Number of acks (guaranteed copies) to wait for each entry, default: 0
managed_ledger_max_mark_delete_rate = 0.0 // Throttling rate of mark-delete operation (0 means no throttle), default: 0.0
}
permission_grant {
role = "some-role"
actions = ["produce", "consume", "functions"]
}
}
```

Expand All @@ -234,6 +239,7 @@ resource "pulsar_namespace" "test" {
| `retention_policies` | Data retention policies | No |
| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |

### `pulsar_topic`

Expand Down
113 changes: 112 additions & 1 deletion pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/hashicorp/go-multierror"

"github.com/streamnative/pulsarctl/pkg/pulsar/common"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/streamnative/terraform-provider-pulsar/types"

Expand Down Expand Up @@ -197,6 +198,28 @@ func resourcePulsarNamespace() *schema.Resource {
},
Set: persistencePoliciesToHash,
},
"permission_grant": {
Type: schema.TypeList,
Optional: true,
MinItems: 0,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"role": {
Type: schema.TypeString,
Required: true,
},
"actions": {
Type: schema.TypeSet,
Required: true,
MinItems: 1,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validateAuthAction,
},
},
},
},
},
},
}
}
Expand Down Expand Up @@ -225,7 +248,7 @@ func resourcePulsarNamespaceCreate(d *schema.ResourceData, meta interface{}) err
return fmt.Errorf("ERROR_CREATE_NAMESPACE: %w", err)
}

if err = resourcePulsarNamespaceUpdate(d, meta); err != nil {
if err := resourcePulsarNamespaceUpdate(d, meta); err != nil {
return fmt.Errorf("ERROR_CREATE_NAMESPACE_CONFIG: %w", err)
}

Expand Down Expand Up @@ -349,6 +372,27 @@ func resourcePulsarNamespaceRead(d *schema.ResourceData, meta interface{}) error
}))
}

if permissionGrantCfg, ok := d.GetOk("permission_grant"); ok && len(permissionGrantCfg.([]interface{})) > 0 {
grants, err := client.GetNamespacePermissions(*ns)
if err != nil {
return fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespacePermissions: %w", err)
}

permissionGrants := []interface{}{}
for role, roleActions := range grants {
actions := []string{}
for _, action := range roleActions {
actions = append(actions, action.String())
}
permissionGrants = append(permissionGrants, map[string]interface{}{
"role": role,
"actions": actions,
})
}

_ = d.Set("permission_grant", schema.NewSet(permissionGrantToHash, permissionGrants))
}

return nil
}

Expand All @@ -363,6 +407,7 @@ func resourcePulsarNamespaceUpdate(d *schema.ResourceData, meta interface{}) err
backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set)
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set)
permissionGrantConfig := d.Get("permission_grant").([]interface{})

nsName, err := utils.GetNameSpaceName(tenant, namespace)
if err != nil {
Expand Down Expand Up @@ -441,6 +486,37 @@ func resourcePulsarNamespaceUpdate(d *schema.ResourceData, meta interface{}) err
}
}

if d.HasChange("permission_grant") {
permissionGrants, err := unmarshalPermissionGrants(permissionGrantConfig)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("unmarshalPermissionGrants: %w", err))
} else {
for _, grant := range permissionGrants {
if err = client.GrantNamespacePermission(*nsName, grant.Role, grant.Actions); err != nil {
errs = multierror.Append(errs, fmt.Errorf("GrantNamespacePermission: %w", err))
}
}

// Revoke permissions for roles removed from the set
oldPermissionGrants, _ := d.GetChange("permission_grant")
for _, oldGrant := range oldPermissionGrants.([]interface{}) {
oldRole := oldGrant.(map[string]interface{})["role"].(string)
found := false
for _, newGrant := range permissionGrants {
if newGrant.Role == oldRole {
found = true
break
}
}
if !found {
if err = client.RevokeNamespacePermission(*nsName, oldRole); err != nil {
errs = multierror.Append(errs, fmt.Errorf("RevokeNamespacePermission: %w", err))
}
}
}
}
}

if errs != nil {
return fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs)
}
Expand Down Expand Up @@ -469,6 +545,7 @@ func resourcePulsarNamespaceDelete(d *schema.ResourceData, meta interface{}) err
_ = d.Set("backlog_quota", nil)
_ = d.Set("dispatch_rate", nil)
_ = d.Set("persistence_policies", nil)
_ = d.Set("permission_grant", nil)

return nil
}
Expand Down Expand Up @@ -551,6 +628,16 @@ func persistencePoliciesToHash(v interface{}) int {
return hashcode.String(buf.String())
}

func permissionGrantToHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

buf.WriteString(fmt.Sprintf("%s-", m["role"].(string)))
buf.WriteString(fmt.Sprintf("%s-", m["actions"].([]string)))

return hashcode.String(buf.String())
}

func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate {
var dispatchRate utils.DispatchRate

Expand Down Expand Up @@ -646,3 +733,27 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies {

return &persPolicies
}

func unmarshalPermissionGrants(v []interface{}) ([]*types.PermissionGrant, error) {
permissionGrants := make([]*types.PermissionGrant, 0, len(v))
for _, grant := range v {
data := grant.(map[string]interface{})

var permissionGrant types.PermissionGrant
permissionGrant.Role = data["role"].(string)

var actions []common.AuthAction
for _, action := range data["actions"].(*schema.Set).List() {
authAction, err := common.ParseAuthAction(action.(string))
if err != nil {
return nil, fmt.Errorf("ERROR_INVALID_AUTH_ACTION: %w", err)
}
actions = append(actions, authAction)
}
permissionGrant.Actions = actions

permissionGrants = append(permissionGrants, &permissionGrant)
}

return permissionGrants, nil
}
Loading

0 comments on commit 3434003

Please sign in to comment.