Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read state in batches #489

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/__tests__/unit/evaluation-options.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('Evaluation options evaluator', () => {
saveState: false
},
throwOnInternalWriteError: true,
transactionsPagesPerBatch: null,
unsafeClient: 'throw',
updateCacheForEachInteraction: false,
useKVStorage: false,
Expand Down Expand Up @@ -65,6 +66,7 @@ describe('Evaluation options evaluator', () => {
saveState: false
},
throwOnInternalWriteError: true,
transactionsPagesPerBatch: null,
unsafeClient: 'throw',
updateCacheForEachInteraction: false,
useKVStorage: false,
Expand Down Expand Up @@ -98,6 +100,7 @@ describe('Evaluation options evaluator', () => {
saveState: false
},
throwOnInternalWriteError: true,
transactionsPagesPerBatch: null,
unsafeClient: 'allow',
updateCacheForEachInteraction: false,
useKVStorage: false,
Expand Down Expand Up @@ -128,6 +131,7 @@ describe('Evaluation options evaluator', () => {
saveState: false
},
throwOnInternalWriteError: true,
transactionsPagesPerBatch: null,
unsafeClient: 'allow',
updateCacheForEachInteraction: false,
useKVStorage: false,
Expand Down Expand Up @@ -158,6 +162,7 @@ describe('Evaluation options evaluator', () => {
saveState: false
},
throwOnInternalWriteError: true,
transactionsPagesPerBatch: null,
unsafeClient: 'throw',
updateCacheForEachInteraction: false,
useKVStorage: false,
Expand Down Expand Up @@ -188,6 +193,7 @@ describe('Evaluation options evaluator', () => {
saveState: false
},
throwOnInternalWriteError: true,
transactionsPagesPerBatch: null,
unsafeClient: 'skip',
updateCacheForEachInteraction: false,
useKVStorage: false,
Expand Down
31 changes: 24 additions & 7 deletions src/contract/Contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,22 @@ export interface Contract<State = unknown> {
*/
readState(
sortKeyOrBlockHeight?: string | number,
caller?: string,
interactions?: GQLNodeInterface[]
interactions?: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise<SortKeyCacheResult<EvalStateResult<State>>>;
/**
* Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on.
*
* Consider this as an experimental feature
*/
readStateBatch(pagesPerBatch: number, signal: AbortSignal): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

readStateFor(
sortKey: string,
interactions: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

/**
* Returns the "view" of the state, computed by the SWC -
Expand All @@ -138,7 +149,8 @@ export interface Contract<State = unknown> {
input: Input,
tags?: Tags,
transfer?: ArTransfer,
caller?: string
caller?: string,
signal?: AbortSignal
): Promise<InteractionResult<State, View>>;

/**
Expand All @@ -155,7 +167,8 @@ export interface Contract<State = unknown> {
*/
viewStateForTx<Input = unknown, View = unknown>(
input: Input,
transaction: GQLNodeInterface
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, View>>;

/**
Expand All @@ -177,7 +190,11 @@ export interface Contract<State = unknown> {
vrf?: boolean
): Promise<InteractionResult<State, unknown>>;

applyInput<Input>(input: Input, transaction: GQLNodeInterface): Promise<InteractionResult<State, unknown>>;
applyInput<Input>(
input: Input,
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, unknown>>;

/**
* Writes a new "interaction" transaction - i.e. such transaction that stores input for the contract.
Expand All @@ -189,7 +206,7 @@ export interface Contract<State = unknown> {

/**
* Returns the full call tree report the last
* interaction with contract (eg. after reading state)
* interaction with contract (e.g. after reading state)
*/
getCallStack(): ContractCallRecord;

Expand Down
6 changes: 4 additions & 2 deletions src/contract/EvaluationOptionsEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator {
remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'],
useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'],
useConstructor: (foreignOptions) => foreignOptions['useConstructor'],
whitelistSources: () => this.rootOptions['whitelistSources']
whitelistSources: () => this.rootOptions['whitelistSources'],
transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch']
};

private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [
'useKVStorage',
'sourceType',
'useConstructor'
'useConstructor',
'transactionsPagesPerBatch'
];

/**
Expand Down
137 changes: 111 additions & 26 deletions src/contract/HandlerBasedContract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ import { SortKeyCacheResult } from '../cache/SortKeyCache';
import { ContractCallRecord, InteractionCall } from '../core/ContractCallRecord';
import { ExecutionContext } from '../core/ExecutionContext';
import {
AbortError,
ContractInteraction,
HandlerApi,
InteractionData,
InteractionResult,
InteractionType
} from '../core/modules/impl/HandlerExecutorFactory';
import { LexicographicalInteractionsSorter } from '../core/modules/impl/LexicographicalInteractionsSorter';
import {
genesisSortKey,
LexicographicalInteractionsSorter
} from '../core/modules/impl/LexicographicalInteractionsSorter';
import { InteractionsSorter } from '../core/modules/InteractionsSorter';
import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from '../core/modules/StateEvaluator';
import { WARP_TAGS } from '../core/KnownTags';
Expand Down Expand Up @@ -38,9 +42,9 @@ import { Mutex } from 'async-mutex';
import { Tag, Transaction, TransactionStatusResponse } from '../utils/types/arweave-types';
import { InteractionState } from './states/InteractionState';
import { ContractInteractionState } from './states/ContractInteractionState';
import { Crypto } from 'warp-isomorphic';
import { Buffer, Crypto } from 'warp-isomorphic';
import { VrfPluginFunctions } from '../core/WarpPlugin';
import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles';
import { createData, DataItem, Signer, tagsExceedLimit } from 'warp-arbundles';

/**
* An implementation of {@link Contract} that is backwards compatible with current style
Expand Down Expand Up @@ -134,8 +138,8 @@ export class HandlerBasedContract<State> implements Contract<State> {

async readState(
sortKeyOrBlockHeight?: string | number,
caller?: string,
interactions?: GQLNodeInterface[]
interactions?: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
this.logger.info('Read state for', {
contractTxId: this._contractTxId,
Expand All @@ -162,7 +166,13 @@ export class HandlerBasedContract<State> implements Contract<State> {
const initBenchmark = Benchmark.measure();
this.maybeResetRootContract();

const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions);
const executionContext = await this.createExecutionContext(
this._contractTxId,
sortKey,
false,
interactions,
signal
);
this.logger.info('Execution Context', {
srcTxId: executionContext.contractDefinition?.srcTxId,
missingInteractions: executionContext.sortedInteractions?.length,
Expand Down Expand Up @@ -200,27 +210,86 @@ export class HandlerBasedContract<State> implements Contract<State> {

async readStateFor(
sortKey: string,
interactions: GQLNodeInterface[]
interactions: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
return this.readState(sortKey, undefined, interactions);
return this.readState(sortKey, interactions, signal);
}

async readStateBatch(pagesPerBatch = 1, signal: AbortSignal): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
if (!this.isRoot()) {
throw new Error('readStateBatch is only allowed for root contract calls');
}
if (pagesPerBatch < 1) {
throw new Error('At least one page per batch is required');
}
if (signal.aborted) {
throw new AbortError('readStateBatch aborted');
}

const contractTxId = this._contractTxId;
const { interactionsLoader, stateEvaluator } = this.warp;
let cachedState = await stateEvaluator.latestAvailableState<State>(contractTxId);

const evaluationOptions = {
...this._evaluationOptions,
transactionsPagesPerBatch: pagesPerBatch
};

let interactions: GQLNodeInterface[];
let batchesLoaded = 0;
do {
const batchBenchmark = Benchmark.measure();
this.logger.debug(`Loading ${++batchesLoaded}`, evaluationOptions);
interactions = await interactionsLoader.load(contractTxId, cachedState?.sortKey, undefined, evaluationOptions);
if (signal.aborted) {
throw new AbortError('readStateBatch aborted');
}
if (interactions.length == 0) {
break;
}
this.logger.debug(`Evaluating ${interactions.length} in ${batchesLoaded}`);
cachedState = await this.readStateFor(cachedState?.sortKey || genesisSortKey, interactions, signal);
if (signal.aborted) {
throw new AbortError('readStateBatch aborted');
}
this.logger.debug(
`Batch ${batchesLoaded} evaluated in ${batchBenchmark.elapsed()} at sortKey ${cachedState.sortKey}`
);
} while (interactions.length > 0);
ppedziwiatr marked this conversation as resolved.
Show resolved Hide resolved

return cachedState;
}

async viewState<Input, View>(
input: Input,
tags: Tags = [],
transfer: ArTransfer = emptyTransfer,
caller?: string
caller?: string,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.logger.info('View state for', this._contractTxId);
return await this.callContract<Input, View>(input, 'view', caller, undefined, tags, transfer);
return await this.callContract<Input, View>(
input,
'view',
caller,
undefined,
tags,
transfer,
false,
false,
true,
signal
ppedziwiatr marked this conversation as resolved.
Show resolved Hide resolved
);
}

async viewStateForTx<Input, View>(
input: Input,
interactionTx: GQLNodeInterface
interactionTx: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.logger.info(`View state for ${this._contractTxId}`);
return await this.doApplyInputOnTx<Input, View>(input, interactionTx, 'view');
return await this.doApplyInputOnTx<Input, View>(input, interactionTx, 'view', signal);
}

async dryWrite<Input>(
Expand All @@ -234,9 +303,13 @@ export class HandlerBasedContract<State> implements Contract<State> {
return await this.callContract<Input>(input, 'write', caller, undefined, tags, transfer, undefined, vrf);
}

async applyInput<Input>(input: Input, transaction: GQLNodeInterface): Promise<InteractionResult<State, unknown>> {
async applyInput<Input>(
input: Input,
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, unknown>> {
this.logger.info(`Apply-input from transaction ${transaction.id} for ${this._contractTxId}`);
return await this.doApplyInputOnTx<Input>(input, transaction, 'write');
return await this.doApplyInputOnTx<Input>(input, transaction, 'write', signal);
}

async writeInteraction<Input>(
Expand Down Expand Up @@ -509,7 +582,8 @@ export class HandlerBasedContract<State> implements Contract<State> {
contractTxId: string,
upToSortKey?: string,
forceDefinitionLoad = false,
interactions?: GQLNodeInterface[]
interactions?: GQLNodeInterface[],
signal?: AbortSignal
): Promise<ExecutionContext<State, HandlerApi<State>>> {
const { definitionLoader, interactionsLoader, stateEvaluator } = this.warp;
let cachedState: SortKeyCacheResult<EvalStateResult<State>>;
Expand All @@ -531,14 +605,17 @@ export class HandlerBasedContract<State> implements Contract<State> {

this.logger.debug('Cached state', cachedState, upToSortKey);

if (cachedState && cachedState.sortKey == upToSortKey) {
if (
(cachedState && cachedState.sortKey == upToSortKey) ||
(upToSortKey == genesisSortKey && interactions?.length)
) {
this.logger.debug('State fully cached, not loading interactions.');
if (forceDefinitionLoad || evolvedSrcTxId || interactions?.length) {
contractDefinition = await definitionLoader.load<State>(contractTxId, evolvedSrcTxId);
contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions);
this.warp.executorFactory.checkWhiteListContractSources(contractDefinition, contractEvaluationOptions);
if (interactions?.length) {
sortedInteractions = (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map(
(i) => i.node
);
sortedInteractions = await this.getSortedInteractions(interactions);
}
}
} else {
Expand Down Expand Up @@ -616,10 +693,15 @@ export class HandlerBasedContract<State> implements Contract<State> {
evaluationOptions: contractEvaluationOptions || this.evaluationOptions(),
handler,
cachedState,
requestedSortKey: upToSortKey
requestedSortKey: upToSortKey,
signal
};
}

private async getSortedInteractions(interactions: GQLNodeInterface[]) {
return (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map((i) => i.node);
}

private resolveEvaluationOptions(rootManifestEvalOptions: EvaluationOptions) {
if (this.isRoot()) {
this._eoEvaluator = new EvaluationOptionsEvaluator(this.evaluationOptions(), rootManifestEvalOptions);
Expand Down Expand Up @@ -661,12 +743,13 @@ export class HandlerBasedContract<State> implements Contract<State> {

private async createExecutionContextFromTx(
contractTxId: string,
transaction: GQLNodeInterface
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<ExecutionContext<State, HandlerApi<State>>> {
const caller = transaction.owner.address;
const sortKey = transaction.sortKey;

const baseContext = await this.createExecutionContext(contractTxId, sortKey, true);
const baseContext = await this.createExecutionContext(contractTxId, sortKey, true, undefined, signal);

return {
...baseContext,
Expand Down Expand Up @@ -695,7 +778,8 @@ export class HandlerBasedContract<State> implements Contract<State> {
transfer: ArTransfer = emptyTransfer,
strict = false,
vrf = false,
sign = true
sign = true,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.logger.info('Call contract input', input);
this.maybeResetRootContract();
Expand All @@ -704,7 +788,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
}
const { arweave, stateEvaluator } = this.warp;
// create execution context
let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true);
let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true, undefined, signal);

const currentBlockData =
this.warp.environment == 'mainnet' && !(this.warp.interactionsLoader.type() === 'arweave')
Expand Down Expand Up @@ -791,12 +875,13 @@ export class HandlerBasedContract<State> implements Contract<State> {
private async doApplyInputOnTx<Input, View = unknown>(
input: Input,
interactionTx: GQLNodeInterface,
interactionType: InteractionType
interactionType: InteractionType,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.maybeResetRootContract();
let evalStateResult: SortKeyCacheResult<EvalStateResult<State>>;

const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx);
const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx, signal);

if (!this.isRoot() && this.interactionState().has(this.txId(), interactionTx.sortKey)) {
evalStateResult = new SortKeyCacheResult<EvalStateResult<State>>(
Expand Down
Loading