Skip to content

Commit

Permalink
Add Spot/On-demand price to table-wide output irrespective of the pri…
Browse files Browse the repository at this point in the history
…ce filters (#106)

* Add Spot/On-demand price to table-wide output irrespective of the price filters

* Minor clean-up

* adjust ec2pricing imports

* remove the importing of un-used Lightsail packages, as [requested by @bwagner](#78 (comment))
* move non-standard lib imports to their own stanza, as [requested by @bwagner](#78 (comment))

Signed-off-by: Mike Ball <[email protected]>

* rename aZones var to 'availabilityZones'

This was requested by @bwagner here:
#78 (comment)

Signed-off-by: Mike Ball <[email protected]>

* rename fields/methods `*CacheUTC`

Per code review from @bwagner, this renames various
methods and fields to be `*CacheUTC` rather than `*CachedUTC`:
#78 (comment)

* update selector to use `LastSpotCacheUTC()`

Previously, `selector` was incorrectly using the old
`LastSpotCachedUTC` method name.

* make `ec2PricingMock` proper `EC2PricingIface`

* remove invalid 'FIXME' comment

This is now fixed, as the Pricing client is always initialized to us-east-1.

Signed-off-by: Mike Ball <[email protected]>

* move error check inside loop

Per [code review feedback](#106 (comment)), the error checking
should appear within the `range pricingOutput.PriceList`.

Signed-off-by: Mike Ball <[email protected]>

* remove invalid 'FIXME' code comment

This issue is now resolved.

Signed-off-by: Mike Ball <[email protected]>

* commit `go mod tidy` results

Signed-off-by: Mike Ball <[email protected]>

* protect against panics during unit tests

This protects against panics like the following, for example:

```
--- FAIL: TestFilter_X8664_AMD64 (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
        panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x38 pc=0x153ec41]

goroutine 153 [running]:
testing.tRunner.func1.2({0x1613ee0, 0x1cdfe70})
        /usr/local/Cellar/go/1.17.2/libexec/src/testing/testing.go:1209 +0x24e
testing.tRunner.func1()
        /usr/local/Cellar/go/1.17.2/libexec/src/testing/testing.go:1212 +0x218
panic({0x1613ee0, 0x1cdfe70})
        /usr/local/Cellar/go/1.17.2/libexec/src/runtime/panic.go:1038 +0x215
github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector.Selector.rawFilter.func1(0x100e6e7, 0x68)
        /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector/pkg/selector/selector.go:192 +0x241
github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector_test.mockedEC2.DescribeInstanceTypesPages(...)
        /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector/pkg/selector/selector_test.go:70
github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector.Selector.rawFilter({{_, _}, {_, _}, {_}}, {0x0, 0x0, 0x0, 0xc0003854a0, 0x0, ...})
        /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector/pkg/selector/selector.go:184 +0x5c2
github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector.Selector.FilterWithOutput({{_, _}, {_, _}, {_}}, {0x0, 0x0, 0x0, 0xc0003854a0, 0x0, ...}, ...)
        /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector/pkg/selector/selector.go:116 +0xc5
github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector.Selector.Filter({{_, _}, {_, _}, {_}}, {0x0, 0x0, 0x0, 0xc0003854a0, 0x0, ...})
        /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector/pkg/selector/selector.go:98 +0xcc
github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector_test.TestFilter_X8664_AMD64(0xc0002de820)
        /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector/pkg/selector/selector_test.go:583 +0x14e
testing.tRunner(0xc00059b520, 0x177e1b0)
        /usr/local/Cellar/go/1.17.2/libexec/src/testing/testing.go:1259 +0x102
created by testing.(*T).Run
        /usr/local/Cellar/go/1.17.2/libexec/src/testing/testing.go:1306 +0x35a
exit status 2
```

Signed-off-by: Mike Ball <[email protected]>

* report received value in test output

This seeks to make the specifics of test failures a bit more clear.

Signed-off-by: Mike Ball <[email protected]>

* run `go mod tidy`

This addresses the following:

```
$ make unit-test
go test -bench=. /Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector//...  -v -coverprofile=coverage.out -covermode=atomic -outputdir=/Users/mball/dev/go/src/github.com/aws/amazon-ec2-instance-selector//build
go: updates to go.mod needed; to update it:
        go mod tidy
make: *** [unit-test] Error 1
```

Signed-off-by: Mike Ball <[email protected]>

* fix failing selector.Filter unit tests

By populating the `lastOnDemandCacheUTC` and `lastSpotCacheUTC`
fields with non-`nil` values, the `Filter` tests exercise relevant
code paths.

Note that this doesn't change @krishna-birla's original implementation
(see PR #78) or vision; it only ensures the tests pass.

Signed-off-by: Mike Ball <[email protected]>

Co-authored-by: Krishna Birla <[email protected]>
  • Loading branch information
mdb and Krishna Birla authored Nov 15, 2021
1 parent 4448edf commit 7578114
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 106 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
### Go ###
# IDE
.idea

# Binaries for programs and plugins
*.exe
*.exe~
Expand Down
28 changes: 23 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"log"
"os"
"strings"
"sync"

commandline "github.com/aws/amazon-ec2-instance-selector/v2/pkg/cli"
"github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector"
Expand Down Expand Up @@ -187,11 +188,29 @@ Full docs can be found at github.com/aws/amazon-` + binName
flags[region] = sess.Config.Region

instanceSelector := selector.New(sess)
if _, ok := flags[pricePerHour]; ok {
if flags[usageClass] == nil || *flags[usageClass].(*string) == "on-demand" {
instanceSelector.EC2Pricing.HydrateOndemandCache()
outputFlag := cli.StringMe(flags[output])
if outputFlag != nil && *outputFlag == tableWideOutput {
// If output type is `table-wide`, simply print both prices for better comparison,
// even if the actual filter is applied on any one of those based on usage class

// Save time by hydrating in parallel
wg := &sync.WaitGroup{}
wg.Add(2)
go func(waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
_ = instanceSelector.EC2Pricing.HydrateOndemandCache()
}(wg)
go func(waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
_ = instanceSelector.EC2Pricing.HydrateSpotCache(30)
}(wg)
wg.Wait()
} else if flags[pricePerHour] != nil {
// Else, if price filters are applied, only hydrate the respective cache as we don't have to print the prices
if flags[usageClass] == nil || *cli.StringMe(flags[usageClass]) == "on-demand" {
_ = instanceSelector.EC2Pricing.HydrateOndemandCache()
} else {
instanceSelector.EC2Pricing.HydrateSpotCache(30)
_ = instanceSelector.EC2Pricing.HydrateSpotCache(30)
}
}

Expand Down Expand Up @@ -252,7 +271,6 @@ Full docs can be found at github.com/aws/amazon-` + binName
}
}

outputFlag := cli.StringMe(flags[output])
outputFn := getOutputFn(outputFlag, selector.InstanceTypesOutputFn(resultsOutputFn))

instanceTypes, itemsTruncated, err := instanceSelector.FilterWithOutput(filters, outputFn)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ require (
github.com/hashicorp/hcl v1.0.0
github.com/imdario/mergo v0.3.11
github.com/mitchellh/go-homedir v1.1.0
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spf13/cobra v0.0.7
github.com/spf13/pflag v1.0.3
go.uber.org/multierr v1.1.0
gopkg.in/ini.v1 v1.57.0
)

require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
go.uber.org/atomic v1.4.0 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,16 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
136 changes: 89 additions & 47 deletions pkg/ec2pricing/ec2pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"go.uber.org/multierr"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
Expand All @@ -25,19 +27,25 @@ const (

// EC2Pricing is the public struct to interface with AWS pricing APIs
type EC2Pricing struct {
PricingClient pricingiface.PricingAPI
EC2Client ec2iface.EC2API
AWSSession *session.Session
cache map[string]float64
spotCache map[string]map[string][]spotPricingEntry
PricingClient pricingiface.PricingAPI
EC2Client ec2iface.EC2API
AWSSession *session.Session
onDemandCache map[string]float64
spotCache map[string]map[string][]spotPricingEntry
lastOnDemandCacheUTC *time.Time // Updated on successful cache write
lastSpotCacheUTC *time.Time // Updated on successful cache write
}

// EC2PricingIface is the EC2Pricing interface mainly used to mock out ec2pricing during testing
type EC2PricingIface interface {
GetOndemandInstanceTypeCost(instanceType string) (float64, error)
GetSpotInstanceTypeNDayAvgCost(instanceType string, availabilityZones []string, days int) (float64, error)
// Keep hydrate functions thread safe by keeping different write data points
// In simple words, make sure they don't write the same variable/file/row etc. which they don't (they have different cache maps)
HydrateOndemandCache() error
HydrateSpotCache(days int) error
LastOnDemandCacheUTC() *time.Time
LastSpotCacheUTC() *time.Time
}

type spotPricingEntry struct {
Expand All @@ -49,12 +57,26 @@ type spotPricingEntry struct {
func New(sess *session.Session) *EC2Pricing {
return &EC2Pricing{
// use us-east-1 since pricing only has endpoints in us-east-1 and ap-south-1
PricingClient: pricing.New(sess.Copy(aws.NewConfig().WithRegion("us-east-1"))),
EC2Client: ec2.New(sess),
AWSSession: sess,
PricingClient: pricing.New(sess.Copy(aws.NewConfig().WithRegion("us-east-1"))),
EC2Client: ec2.New(sess),
AWSSession: sess,
lastOnDemandCacheUTC: nil,
lastSpotCacheUTC: nil,
}
}

// LastOnDemandCacheUTC returns the UTC timestamp when the onDemandCache was last refreshed
// Returns nil if the onDemandCache has not been initialized
func (p *EC2Pricing) LastOnDemandCacheUTC() *time.Time {
return p.lastOnDemandCacheUTC
}

// LastSpotCacheUTC returns the UTC timestamp when the spotCache was last refreshed
// Returns nil if the spotCache has not been initialized
func (p *EC2Pricing) LastSpotCacheUTC() *time.Time {
return p.lastSpotCacheUTC
}

// GetSpotInstanceTypeNDayAvgCost retrieves the spot price history for a given AZ from the past N days and averages the price
// Passing an empty list for availabilityZones will retrieve avg cost for all AZs in the current AWSSession's region
func (p *EC2Pricing) GetSpotInstanceTypeNDayAvgCost(instanceType string, availabilityZones []string, days int) (float64, error) {
Expand All @@ -67,28 +89,31 @@ func (p *EC2Pricing) GetSpotInstanceTypeNDayAvgCost(instanceType string, availab
EndTime: &endTime,
InstanceTypes: []*string{&instanceType},
}
zoneToPriceEntries := map[string][]spotPricingEntry{}
zoneToPriceEntries := make(map[string][]spotPricingEntry)

if _, ok := p.spotCache[instanceType]; !ok {
var processingErr error
err := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
errAPI := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
for _, history := range dspho.SpotPriceHistory {
var spotPrice float64
spotPrice, processingErr = strconv.ParseFloat(*history.SpotPrice, 64)
spotPrice, errParse := strconv.ParseFloat(*history.SpotPrice, 64)
if errParse != nil {
processingErr = multierr.Append(processingErr, errParse)
continue
}
zone := *history.AvailabilityZone

zoneToPriceEntries[zone] = append(zoneToPriceEntries[zone], spotPricingEntry{
Timestamp: *history.Timestamp,
SpotPrice: spotPrice,
})
}
return true
})
if err != nil {
return float64(0), err
if errAPI != nil {
return float64(-1), errAPI
}
if processingErr != nil {
return float64(0), processingErr
return float64(-1), processingErr
}
} else {
for zone, priceEntries := range p.spotCache[instanceType] {
Expand All @@ -113,7 +138,7 @@ func (p *EC2Pricing) GetSpotInstanceTypeNDayAvgCost(instanceType string, availab
aggregateZonePriceSum += p.calculateSpotAggregate(priceEntries)
}

return (aggregateZonePriceSum / float64(numOfZones)), nil
return aggregateZonePriceSum / float64(numOfZones), nil
}

func (p *EC2Pricing) calculateSpotAggregate(spotPriceEntries []spotPricingEntry) float64 {
Expand All @@ -134,11 +159,16 @@ func (p *EC2Pricing) calculateSpotAggregate(spotPriceEntries []spotPricingEntry)
duration := spotPriceEntries[int(math.Max(float64(i-1), 0))].Timestamp.Sub(entry.Timestamp).Minutes()
priceSum += duration * entry.SpotPrice
}
return (priceSum / totalDuration)
return priceSum / totalDuration
}

// GetOndemandInstanceTypeCost retrieves the on-demand hourly cost for the specified instance type
func (p *EC2Pricing) GetOndemandInstanceTypeCost(instanceType string) (float64, error) {
// Check cache first and return it if available
if price, ok := p.onDemandCache[instanceType]; ok {
return price, nil
}

regionDescription := p.getRegionForPricingAPI()
// TODO: mac.metal instances cannot be found with the below filters
productInput := pricing.GetProductsInput{
Expand All @@ -154,25 +184,25 @@ func (p *EC2Pricing) GetOndemandInstanceTypeCost(instanceType string) (float64,
},
}

// Check cache first and return it if available
if price, ok := p.cache[instanceType]; ok {
return price, nil
}

pricePerUnitInUSD := float64(-1)
err := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
var err error
var processingErr error
errAPI := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
var errParse error
for _, priceDoc := range pricingOutput.PriceList {
_, pricePerUnitInUSD, err = parseOndemandUnitPrice(priceDoc)
}
if err != nil {
// keep going through pages if we can't parse the pricing doc
return true
_, pricePerUnitInUSD, errParse = parseOndemandUnitPrice(priceDoc)
if errParse != nil {
processingErr = multierr.Append(processingErr, errParse)
// keep going through pages if we can't parse the pricing doc
return true
}
}
return false
})
if err != nil {
return -1, err
if errAPI != nil {
return -1, errAPI
}
if processingErr != nil {
return -1, processingErr
}
return pricePerUnitInUSD, nil
}
Expand All @@ -182,7 +212,7 @@ func (p *EC2Pricing) GetOndemandInstanceTypeCost(instanceType string) (float64,
// There is no TTL on cache entries
// You'll only want to use this if you don't mind a long startup time (around 30 seconds) and will query the cache often after that.
func (p *EC2Pricing) HydrateSpotCache(days int) error {
newCache := map[string]map[string][]spotPricingEntry{}
newCache := make(map[string]map[string][]spotPricingEntry)

endTime := time.Now().UTC()
startTime := endTime.Add(time.Hour * time.Duration(24*-1*days))
Expand All @@ -192,14 +222,17 @@ func (p *EC2Pricing) HydrateSpotCache(days int) error {
EndTime: &endTime,
}
var processingErr error
err := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
errAPI := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
for _, history := range dspho.SpotPriceHistory {
var spotPrice float64
spotPrice, processingErr = strconv.ParseFloat(*history.SpotPrice, 64)
spotPrice, errFloat := strconv.ParseFloat(*history.SpotPrice, 64)
if errFloat != nil {
processingErr = multierr.Append(processingErr, errFloat)
continue
}
instanceType := *history.InstanceType
zone := *history.AvailabilityZone
if _, ok := newCache[instanceType]; !ok {
newCache[instanceType] = map[string][]spotPricingEntry{}
newCache[instanceType] = make(map[string][]spotPricingEntry)
}
newCache[instanceType][zone] = append(newCache[instanceType][zone], spotPricingEntry{
Timestamp: *history.Timestamp,
Expand All @@ -208,20 +241,21 @@ func (p *EC2Pricing) HydrateSpotCache(days int) error {
}
return true
})
if err != nil {
return err
if errAPI != nil {
return errAPI
}
cTime := time.Now().UTC()
p.spotCache = newCache
p.lastSpotCacheUTC = &cTime
return processingErr
}

// HydrateOndemandCache makes a bulk request to the pricing api to retrieve all instance type pricing and stores them in a local cache
// If HydrateOndemandCache is called more than once, the cache will be fully refreshed
// There is no TTL on cache entries
func (p *EC2Pricing) HydrateOndemandCache() error {
if p.cache == nil {
p.cache = make(map[string]float64)
}
newOnDemandCache := make(map[string]float64)

regionDescription := p.getRegionForPricingAPI()
productInput := pricing.GetProductsInput{
ServiceCode: aws.String(serviceCode),
Expand All @@ -234,17 +268,25 @@ func (p *EC2Pricing) HydrateOndemandCache() error {
{Type: aws.String(pricing.FilterTypeTermMatch), Field: aws.String("tenancy"), Value: aws.String("shared")},
},
}
err := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
var processingErr error
errAPI := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
for _, priceDoc := range pricingOutput.PriceList {
instanceTypeName, price, err := parseOndemandUnitPrice(priceDoc)
if err != nil {
instanceTypeName, price, errParse := parseOndemandUnitPrice(priceDoc)
if errParse != nil {
processingErr = multierr.Append(processingErr, errParse)
continue
}
p.cache[instanceTypeName] = price
newOnDemandCache[instanceTypeName] = price
}
return true
})
return err
if errAPI != nil {
return errAPI
}
cTime := time.Now().UTC()
p.onDemandCache = newOnDemandCache
p.lastOnDemandCacheUTC = &cTime
return processingErr
}

// getRegionForPricingAPI attempts to retrieve the region description based on the AWS session used to create
Expand Down
Loading

0 comments on commit 7578114

Please sign in to comment.