Skip to content

Commit

Permalink
Merge branch 'development' into deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Apr 3, 2020
2 parents 6ec34a0 + a8999f9 commit 294d8ef
Show file tree
Hide file tree
Showing 100 changed files with 1,530 additions and 729 deletions.
4 changes: 2 additions & 2 deletions api/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type CacherStub struct {
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte))
RegisterHandlerCalled func(func(key []byte, value interface{}))
}

// Clear -
Expand Down Expand Up @@ -72,7 +72,7 @@ func (cs *CacherStub) MaxSize() int {
}

// RegisterHandler -
func (cs *CacherStub) RegisterHandler(handler func(key []byte)) {
func (cs *CacherStub) RegisterHandler(handler func(key []byte, value interface{})) {
cs.RegisterHandlerCalled(handler)
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,16 @@ func ProcessComponentsFactory(args *processComponentsFactoryArgs) (*Process, err
}

_, err = poolsCleaner.NewMiniBlocksPoolsCleaner(
blockTracker,
args.data.Datapool.MiniBlocks(),
rounder,
args.shardCoordinator,
)
if err != nil {
return nil, err
}

_, err = poolsCleaner.NewCrossTxsPoolsCleaner(
blockTracker,
args.data.Datapool,
rounder,
Expand Down
38 changes: 25 additions & 13 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,9 +776,11 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
ctx,
externalConfig.ElasticSearchConnector,
externalConfig.ElasticSearchConnector.URL,
shardCoordinator,
coreComponents.InternalMarshalizer,
coreComponents.Hasher,
nodesCoordinator,
epochStartNotifier,
shardCoordinator.SelfId(),
)
if err != nil {
return err
Expand Down Expand Up @@ -877,7 +879,7 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {

if shardCoordinator.SelfId() == core.MetachainShardId {
log.Trace("activating nodesCoordinator's validators indexing")
indexValidatorsListIfNeeded(elasticIndexer, nodesCoordinator, log)
indexValidatorsListIfNeeded(elasticIndexer, nodesCoordinator, processComponents.EpochStartTrigger.Epoch(), log)
}

log.Trace("creating api resolver structure")
Expand Down Expand Up @@ -1056,18 +1058,24 @@ func prepareLogFile(workingDir string) (*os.File, error) {
return fileForLog, nil
}

func indexValidatorsListIfNeeded(elasticIndexer indexer.Indexer, coordinator sharding.NodesCoordinator, log logger.Logger) {
func indexValidatorsListIfNeeded(
elasticIndexer indexer.Indexer,
coordinator sharding.NodesCoordinator,
epoch uint32,
log logger.Logger,

) {
if check.IfNil(elasticIndexer) {
return
}

validatorsPubKeys, err := coordinator.GetAllEligibleValidatorsPublicKeys(0)
validatorsPubKeys, err := coordinator.GetAllEligibleValidatorsPublicKeys(epoch)
if err != nil {
log.Warn("GetAllEligibleValidatorPublicKeys for epoch 0 failed", "error", err)
}

if len(validatorsPubKeys) > 0 {
go elasticIndexer.SaveValidatorsPubKeys(validatorsPubKeys)
go elasticIndexer.SaveValidatorsPubKeys(validatorsPubKeys, epoch)
}
}

Expand Down Expand Up @@ -1306,18 +1314,22 @@ func createElasticIndexer(
ctx *cli.Context,
elasticSearchConfig config.ElasticSearchConfig,
url string,
coordinator sharding.Coordinator,
marshalizer marshal.Marshalizer,
hasher hashing.Hasher,
nodesCoordinator sharding.NodesCoordinator,
startNotifier notifier.EpochStartNotifier,
shardId uint32,
) (indexer.Indexer, error) {
arguments := indexer.ElasticIndexerArgs{
Url: url,
UserName: elasticSearchConfig.Username,
Password: elasticSearchConfig.Password,
ShardCoordinator: coordinator,
Marshalizer: marshalizer,
Hasher: hasher,
Options: &indexer.Options{TxIndexingEnabled: ctx.GlobalBoolT(enableTxIndexing.Name)},
Url: url,
UserName: elasticSearchConfig.Username,
Password: elasticSearchConfig.Password,
Marshalizer: marshalizer,
Hasher: hasher,
Options: &indexer.Options{TxIndexingEnabled: ctx.GlobalBoolT(enableTxIndexing.Name)},
NodesCoordinator: nodesCoordinator,
EpochStartNotifier: startNotifier,
ShardId: shardId,
}

var err error
Expand Down
14 changes: 7 additions & 7 deletions consensus/mock/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ type IndexerMock struct {
}

// SaveBlock -
func (im *IndexerMock) SaveBlock(body data.BodyHandler, header data.HeaderHandler, txPool map[string]data.TransactionHandler, signersIndexes []uint64) {
func (im *IndexerMock) SaveBlock(_ data.BodyHandler, _ data.HeaderHandler, _ map[string]data.TransactionHandler, _ []uint64, _ []string) {
panic("implement me")
}

// SaveMetaBlock -
func (im *IndexerMock) SaveMetaBlock(header data.HeaderHandler, signersIndexes []uint64) {
panic("implement me")
// SaveValidatorsRating --
func (im *IndexerMock) SaveValidatorsRating(_ string, _ []indexer.ValidatorRatingInfo) {

}

// UpdateTPS -
func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) {
func (im *IndexerMock) UpdateTPS(_ statistics.TPSBenchmark) {
panic("implement me")
}

// SaveRoundInfo -
func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) {
func (im *IndexerMock) SaveRoundInfo(_ indexer.RoundInfo) {
panic("implement me")
}

// SaveValidatorsPubKeys -
func (im *IndexerMock) SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte) {
func (im *IndexerMock) SaveValidatorsPubKeys(_ map[uint32][][]byte, _ uint32) {
panic("implement me")
}

Expand Down
2 changes: 2 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ const (
ConsensusOrder
// NetworkShardingOrder defines the order in which the network sharding subsystem is notified of a start of epoch event
NetworkShardingOrder
// IndexerOrder defines the order in which Indexer is notified of a start of epoch event
IndexerOrder
)

// NodeState specifies what type of state a node could have
Expand Down
7 changes: 5 additions & 2 deletions core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ var ErrNilMarshalizer = errors.New("nil marshalizer provided")
// ErrNilHasher signals that a nil hasher has been provided
var ErrNilHasher = errors.New("nil hasher provided")

// ErrNilCoordinator signals that a nil shardCoordinator has been provided
var ErrNilCoordinator = errors.New("nil coordinator provided")
// ErrNilNodesCoordinator signals a nil nodes coordinator has been provided
var ErrNilNodesCoordinator = errors.New("nil nodes coordinator")

// ErrInvalidValue signals that a nil value has been provided
var ErrInvalidValue = errors.New("invalid value provided")
Expand Down Expand Up @@ -49,3 +49,6 @@ var ErrNilStatusTagProvider = errors.New("nil status tag provider")
// ErrInvalidIdentifierForEpochStartBlockRequest signals that an invalid identifier for epoch start block request
// has been provided
var ErrInvalidIdentifierForEpochStartBlockRequest = errors.New("invalid identifier for epoch start block request")

// ErrNilEpochStartNotifier signals that nil epoch start notifier has been provided
var ErrNilEpochStartNotifier = errors.New("nil epoch start notifier")
77 changes: 73 additions & 4 deletions core/indexer/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ func checkElasticSearchParams(arguments ElasticIndexerArgs) error {
if arguments.Password == "" {
return ErrEmptyPassword
}
if check.IfNil(arguments.ShardCoordinator) {
return core.ErrNilCoordinator
}
if check.IfNil(arguments.Marshalizer) {
return core.ErrNilMarshalizer
}
if check.IfNil(arguments.Hasher) {
return core.ErrNilHasher
}
if check.IfNil(arguments.NodesCoordinator) {
return core.ErrNilNodesCoordinator
}
if arguments.EpochStartNotifier == nil {
return core.ErrNilEpochStartNotifier
}

return nil
}
Expand Down Expand Up @@ -213,7 +216,7 @@ func buildRewardTransaction(
Round: rTx.Round,
Value: rTx.Value.String(),
Receiver: hex.EncodeToString(rTx.RcvAddr),
Sender: metachainTpsDocID,
Sender: fmt.Sprintf("%d", core.MetachainShardId),
ReceiverShard: mb.ReceiverShardID,
SenderShard: mb.SenderShardID,
GasPrice: 0,
Expand Down Expand Up @@ -252,3 +255,69 @@ func buildReceiptTransaction(
Status: "Success",
}
}

func serializeBulkMiniBlocks(hdrShardID uint32, bulkMbs []*Miniblock) bytes.Buffer {
var buff bytes.Buffer

for _, mb := range bulkMbs {
var err error
var meta, serializedData []byte
if hdrShardID == mb.SenderShardID {
//insert miniblock
meta = []byte(fmt.Sprintf(`{ "index" : { "_id" : "%s", "_type" : "%s" } }%s`, mb.Hash, "_doc", "\n"))
serializedData, err = json.Marshal(mb)
if err != nil {
log.Debug("indexer: marshal",
"error", "could not serialize miniblock, will skip indexing",
"mb hash", mb.Hash)
continue
}
} else {
// update miniblock
meta = []byte(fmt.Sprintf(`{ "update" : { "_id" : "%s" } }%s`, mb.Hash, "\n"))
serializedData = []byte(fmt.Sprintf(`{ "doc": { "receiverBlockHash" : "%s" } }`, mb.ReceiverBlockHash))
}

// append a newline for each element
serializedData = append(serializedData, "\n"...)
buff.Grow(len(meta) + len(serializedData))
_, err = buff.Write(meta)
if err != nil {
log.Warn("elastic search: serialize bulk miniblocks, write meta", "error", err.Error())
}
_, err = buff.Write(serializedData)
if err != nil {
log.Warn("elastic search: serialize bulk miniblocks, write serialized miniblock", "error", err.Error())
}
}

return buff
}

func serializeBulkTxs(bulk []*Transaction) bytes.Buffer {
var buff bytes.Buffer
for _, tx := range bulk {
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%s", "_type" : "%s" } }%s`, tx.Hash, "_doc", "\n"))
serializedTx, err := json.Marshal(tx)
if err != nil {
log.Debug("indexer: marshal",
"error", "could not serialize transaction, will skip indexing",
"tx hash", tx.Hash)
continue
}
// append a newline for each element
serializedTx = append(serializedTx, "\n"...)

buff.Grow(len(meta) + len(serializedTx))
_, err = buff.Write(meta)
if err != nil {
log.Warn("elastic search: serialize bulk tx, write meta", "error", err.Error())
}
_, err = buff.Write(serializedTx)
if err != nil {
log.Warn("elastic search: serialize bulk tx, write serialized tx", "error", err.Error())
}
}

return buff
}
4 changes: 3 additions & 1 deletion core/indexer/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package indexer

import (
"encoding/hex"
"fmt"
"math/big"
"testing"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/data/block"
"github.com/ElrondNetwork/elrond-go/data/receipt"
"github.com/ElrondNetwork/elrond-go/data/rewardTx"
Expand Down Expand Up @@ -56,7 +58,7 @@ func TestGetTransactionByType_RewardTx(t *testing.T) {
Receiver: hex.EncodeToString(rcvAddr),
Status: "Success",
Value: "<nil>",
Sender: metachainTpsDocID,
Sender: fmt.Sprintf("%d", core.MetachainShardId),
Data: []byte(""),
}
require.Equal(t, expectedTx, resultTx)
Expand Down
2 changes: 2 additions & 0 deletions core/indexer/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package indexer
const txBulkSize = 1000
const txIndex = "transactions"
const blockIndex = "blocks"
const miniblocksIndex = "miniblocks"
const tpsIndex = "tps"
const validatorsIndex = "validators"
const roundIndex = "rounds"
const ratingIndex = "rating"

const metachainTpsDocID = "meta"
const shardTpsDocIDPrefix = "shard"
52 changes: 38 additions & 14 deletions core/indexer/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// to be saved for a transaction. It has all the default fields
// plus some extra information for ease of search and filter
type Transaction struct {
Hash string `json:"hash"`
Hash string `json:"-"`
MBHash string `json:"miniBlockHash"`
BlockHash string `json:"blockHash"`
Nonce uint64 `json:"nonce"`
Expand All @@ -31,18 +31,21 @@ type Transaction struct {
// to be saved for a block. It has all the default fields
// plus some extra information for ease of search and filter
type Block struct {
Nonce uint64 `json:"nonce"`
Round uint64 `json:"round"`
Hash string `json:"hash"`
Proposer uint64 `json:"proposer"`
Validators []uint64 `json:"validators"`
PubKeyBitmap string `json:"pubKeyBitmap"`
Size int64 `json:"size"`
Timestamp time.Duration `json:"timestamp"`
StateRootHash string `json:"stateRootHash"`
PrevHash string `json:"prevHash"`
ShardID uint32 `json:"shardId"`
TxCount uint32 `json:"txCount"`
Nonce uint64 `json:"nonce"`
Round uint64 `json:"round"`
Epoch uint32 `json:"epoch"`
Hash string `json:"-"`
MiniBlocksHashes []string `json:"miniBlocksHashes"`
NotarizedBlocksHashes []string `json:"notarizedBlocksHashes"`
Proposer uint64 `json:"proposer"`
Validators []uint64 `json:"validators"`
PubKeyBitmap string `json:"pubKeyBitmap"`
Size int64 `json:"size"`
Timestamp time.Duration `json:"timestamp"`
StateRootHash string `json:"stateRootHash"`
PrevHash string `json:"prevHash"`
ShardID uint32 `json:"shardId"`
TxCount uint32 `json:"txCount"`
}

//ValidatorsPublicKeys is a structure containing fields for validators public keys
Expand All @@ -52,13 +55,34 @@ type ValidatorsPublicKeys struct {

// RoundInfo is a structure containing block signers and shard id
type RoundInfo struct {
Index uint64 `json:"-"`
Index uint64 `json:"round"`
SignersIndexes []uint64 `json:"signersIndexes"`
BlockWasProposed bool `json:"blockWasProposed"`
ShardId uint32 `json:"shardId"`
Timestamp time.Duration `json:"timestamp"`
}

// ValidatorsRatingInfo is a structure containing validators information
type ValidatorsRatingInfo struct {
ValidatorsInfos []ValidatorRatingInfo `json:"validatorsRating"`
}

// ValidatorRatingInfo is a structure containing validator rating information
type ValidatorRatingInfo struct {
PublicKey string `json:"publicKey"`
Rating float32 `json:"rating"`
}

// Miniblock is a structure containing miniblock information
type Miniblock struct {
Hash string `json:"-"`
SenderShardID uint32 `json:"senderShard"`
ReceiverShardID uint32 `json:"receiverShard"`
SenderBlockHash string `json:"senderBlockHash"`
ReceiverBlockHash string `json:"receiverBlockHash"`
Type string `json:"type"`
}

// TPS is a structure containing all the fields that need to
// be saved for a shard statistic in the database
type TPS struct {
Expand Down
Loading

0 comments on commit 294d8ef

Please sign in to comment.