From 56fcd3ff8e4aa3b5a8b9d08c420fa90f7462c579 Mon Sep 17 00:00:00 2001 From: daviszhen <60595215+daviszhen@users.noreply.github.com> Date: Tue, 8 Mar 2022 20:33:18 +0800 Subject: [PATCH] Add epochgc (#1841) --- cmd/db-server/main.go | 7 + .../tpe/computation/computationHandler.go | 2 + pkg/vm/engine/tpe/descriptor/types.go | 10 +- pkg/vm/engine/tpe/engine/engine.go | 8 ++ pkg/vm/engine/tpe/epoch/epoch.go | 45 ------ pkg/vm/engine/tpe/epoch/epoch_test.go | 31 ---- .../tpe/tuplecodec/computationhandler.go | 13 +- pkg/vm/engine/tpe/tuplecodec/constants.go | 24 +++- pkg/vm/engine/tpe/tuplecodec/cubekv.go | 5 + .../tpe/tuplecodec/descriptorhandler.go | 134 ++++++++++++++++-- .../tpe/tuplecodec/descriptorhandler_test.go | 48 +++++++ pkg/vm/engine/tpe/tuplecodec/epoch.go | 95 +++++++++++++ pkg/vm/engine/tpe/tuplecodec/epoch_test.go | 91 ++++++++++++ .../tpe/tuplecodec/indexhandler_test.go | 6 +- pkg/vm/engine/tpe/tuplecodec/kv_types.go | 3 + pkg/vm/engine/tpe/tuplecodec/memorykv.go | 25 ++++ pkg/vm/engine/tpe/tuplecodec/memorykv_test.go | 52 +++++++ 17 files changed, 494 insertions(+), 105 deletions(-) delete mode 100644 pkg/vm/engine/tpe/epoch/epoch.go delete mode 100644 pkg/vm/engine/tpe/epoch/epoch_test.go create mode 100644 pkg/vm/engine/tpe/tuplecodec/epoch.go create mode 100644 pkg/vm/engine/tpe/tuplecodec/epoch_test.go diff --git a/cmd/db-server/main.go b/cmd/db-server/main.go index 465dfff292965..84e28bbfdb67d 100644 --- a/cmd/db-server/main.go +++ b/cmd/db-server/main.go @@ -137,6 +137,13 @@ func removeEpoch(epoch uint64) { if err != nil { fmt.Printf("catalog remove ddl failed. error :%v \n", err) } + if tpe,ok := config.StorageEngine.(*tpeEngine.TpeEngine) ; ok { + err = tpe.RemoveDeletedTable(epoch) + if err != nil { + fmt.Printf("tpeEngine remove ddl failed. error :%v \n", err) + } + } + } func main() { diff --git a/pkg/vm/engine/tpe/computation/computationHandler.go b/pkg/vm/engine/tpe/computation/computationHandler.go index 12f88a6d4c03e..f0c49fdc41114 100644 --- a/pkg/vm/engine/tpe/computation/computationHandler.go +++ b/pkg/vm/engine/tpe/computation/computationHandler.go @@ -41,4 +41,6 @@ type ComputationHandler interface { Read(readCtx interface{}) (*batch.Batch, error) Write(writeCtx interface{}, bat *batch.Batch) error + + RemoveDeletedTable(epoch uint64) (int, error) } \ No newline at end of file diff --git a/pkg/vm/engine/tpe/descriptor/types.go b/pkg/vm/engine/tpe/descriptor/types.go index f9d07a3f9cfb2..604aff5b4b233 100644 --- a/pkg/vm/engine/tpe/descriptor/types.go +++ b/pkg/vm/engine/tpe/descriptor/types.go @@ -231,6 +231,12 @@ func ExtractIndexAttributeDescIDs(attrs []*AttributeDesc) []int { return ids } +type EpochGCItem struct { + Epoch uint64 + DbID uint64 + TableID uint64 +} + // DescriptorHandler loads and updates the descriptors type DescriptorHandler interface { @@ -267,6 +273,6 @@ type DescriptorHandler interface { //StoreRelationDescIntoAsyncGC stores the table into the asyncgc table StoreRelationDescIntoAsyncGC(epoch uint64, dbID uint64, desc *RelationDesc) error - //ListRelationDescFromAsyncGC gets all the tables from the asyncgc table - ListRelationDescFromAsyncGC(epoch uint64) ([]*RelationDesc, error) + //ListRelationDescFromAsyncGC gets all the tables need to deleted from the asyncgc table + ListRelationDescFromAsyncGC(epoch uint64) ([]EpochGCItem, error) } \ No newline at end of file diff --git a/pkg/vm/engine/tpe/engine/engine.go b/pkg/vm/engine/tpe/engine/engine.go index 2d0749118a898..9262aba36e9fa 100644 --- a/pkg/vm/engine/tpe/engine/engine.go +++ b/pkg/vm/engine/tpe/engine/engine.go @@ -91,4 +91,12 @@ func (te * TpeEngine) Database(name string) (engine.Database, error) { func (te * TpeEngine) Node(s string) *engine.NodeInfo { return &engine.NodeInfo{Mcpu: 1} +} + +func (te * TpeEngine) RemoveDeletedTable(epoch uint64) error { + _, err := te.computeHandler.RemoveDeletedTable(epoch) + if err != nil { + return err + } + return nil } \ No newline at end of file diff --git a/pkg/vm/engine/tpe/epoch/epoch.go b/pkg/vm/engine/tpe/epoch/epoch.go deleted file mode 100644 index 3e1eede9ee9c2..0000000000000 --- a/pkg/vm/engine/tpe/epoch/epoch.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package epoch - -import "sync" - -type EpochHandler struct { - epoch uint64 - rwlock sync.RWMutex -} - -func NewEpochHandler() *EpochHandler { - return &EpochHandler{} -} - -func (eh *EpochHandler) SetEpoch(e uint64) { - eh.rwlock.Lock() - defer eh.rwlock.Unlock() - eh.epoch = e -} - -func (eh *EpochHandler) GetEpoch() uint64{ - eh.rwlock.RLock() - defer eh.rwlock.RUnlock() - return eh.epoch -} - -func (eh *EpochHandler) RemoveDeletedTable(epoch uint64) (int, error) { - eh.rwlock.Lock() - defer eh.rwlock.Unlock() - //TODO:implement - return 0, nil -} \ No newline at end of file diff --git a/pkg/vm/engine/tpe/epoch/epoch_test.go b/pkg/vm/engine/tpe/epoch/epoch_test.go deleted file mode 100644 index da0ffb4903a6b..0000000000000 --- a/pkg/vm/engine/tpe/epoch/epoch_test.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package epoch - -import ( - "github.com/smartystreets/goconvey/convey" - "testing" -) - -func TestEpochHandler_SetEpoch(t *testing.T) { - convey.Convey("set/get",t, func() { - eh := NewEpochHandler() - - for i := 0; i < 100; i++ { - eh.SetEpoch(uint64(i)) - convey.So(uint64(i),convey.ShouldEqual, eh.GetEpoch()) - } - }) -} \ No newline at end of file diff --git a/pkg/vm/engine/tpe/tuplecodec/computationhandler.go b/pkg/vm/engine/tpe/tuplecodec/computationhandler.go index 03eba91254179..461ea3aafca64 100644 --- a/pkg/vm/engine/tpe/tuplecodec/computationhandler.go +++ b/pkg/vm/engine/tpe/tuplecodec/computationhandler.go @@ -46,6 +46,7 @@ type ComputationHandlerImpl struct { tch *TupleCodecHandler serializer ValueSerializer indexHandler index.IndexHandler + epochHandler * EpochHandler } func (chi *ComputationHandlerImpl) Read(readCtx interface{}) (*batch.Batch, error) { @@ -168,8 +169,8 @@ func (chi *ComputationHandlerImpl) GetDatabase(dbName string) (*descriptor.Datab //callbackForGetDatabaseDesc extracts the databaseDesc func (chi *ComputationHandlerImpl) callbackForGetDatabaseDesc (callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) { //get the name and the desc - descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID] - descDI := dis[InternalDescriptorTableID_desc_ID] + descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID] + descDI := dis[InternalDescriptorTable_desc_ID] if !(descDI.IsValueType(descAttr.Ttype)) { return nil,errorTypeInValueNotEqualToTypeInAttribute } @@ -303,8 +304,8 @@ func (chi *ComputationHandlerImpl) DropTableByDesc(epoch, dbId uint64, tableDesc //callbackForGetTableDesc extracts the tableDesc func (chi *ComputationHandlerImpl) callbackForGetTableDesc (callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) { //get the name and the desc - descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID] - descDI := dis[InternalDescriptorTableID_desc_ID] + descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID] + descDI := dis[InternalDescriptorTable_desc_ID] if !(descDI.IsValueType(descAttr.Ttype)) { return nil,errorTypeInValueNotEqualToTypeInAttribute } @@ -374,6 +375,10 @@ func (chi *ComputationHandlerImpl) GetTable(dbId uint64, name string) (*descript return tableDesc,nil } +func (chi *ComputationHandlerImpl) RemoveDeletedTable(epoch uint64) (int, error) { + return chi.epochHandler.RemoveDeletedTable(epoch) +} + type AttributeStateForWrite struct { PositionInBatch int diff --git a/pkg/vm/engine/tpe/tuplecodec/constants.go b/pkg/vm/engine/tpe/tuplecodec/constants.go index 1f7889252f578..ed19908f9d4b4 100644 --- a/pkg/vm/engine/tpe/tuplecodec/constants.go +++ b/pkg/vm/engine/tpe/tuplecodec/constants.go @@ -29,14 +29,18 @@ const ( //holding the schema of the table InternalDescriptorTableID uint64 = 0 - InternalDescriptorTableID_parentID_ID = 0 - InternalDescriptorTableID_id_ID = 1 - InternalDescriptorTableID_name_ID = 2 - InternalDescriptorTableID_desc_ID = 3 - PrimaryIndexID uint32 = 1 + InternalDescriptorTable_parentID_ID = 0 + InternalDescriptorTable_id_ID = 1 + InternalDescriptorTable_name_ID = 2 + InternalDescriptorTable_desc_ID = 3 + PrimaryIndexID uint32 = 1 //holding the epochgced table InternalAsyncGCTableID uint64 = 1 + InternalAsyncGCTable_epoch_ID = 0 + InternalAsyncGCTable_dbID_ID = 1 + InternalAsyncGCTable_tableID_ID = 2 + InternalAsyncGCTable_desc_ID = 3 //user table id offset UserTableIDOffset uint64 = 3 @@ -214,6 +218,16 @@ var ( ID: 0, Type: orderedcodec.VALUE_TYPE_UINT64, }, + { + Name: "dbID", + ID: 1, + Type: orderedcodec.VALUE_TYPE_UINT64, + }, + { + Name: "tableID", + ID: 2, + Type: orderedcodec.VALUE_TYPE_UINT64, + }, }, Impilict_attributes: nil, Composite_attributes: nil, diff --git a/pkg/vm/engine/tpe/tuplecodec/cubekv.go b/pkg/vm/engine/tpe/tuplecodec/cubekv.go index e0939487469bc..2c2b89ebacbe7 100644 --- a/pkg/vm/engine/tpe/tuplecodec/cubekv.go +++ b/pkg/vm/engine/tpe/tuplecodec/cubekv.go @@ -40,6 +40,7 @@ var ( errorCubeDriverIsNull = errors.New("cube driver is nil") errorInvalidIDPool = errors.New("invalid idpool") errorInvalidKeyValueCount = errors.New("key count != value count") + errorUnsupportedInCubeKV = errors.New("unsupported in cubekv") ) var _ KVHandler = &CubeKV{} @@ -215,6 +216,10 @@ func (ck * CubeKV) Delete(key TupleKey) error { return ck.Cube.Delete(key) } +func (ck *CubeKV) DeleteWithPrefix(prefix TupleKey) error { + return errorUnsupportedInCubeKV +} + // Get gets the value of the key. // If the key does not exist, it returns the null func (ck * CubeKV) Get(key TupleKey) (TupleValue, error) { diff --git a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go index 02c4c96c399bf..ba9a66aec7ab6 100644 --- a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go +++ b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler.go @@ -28,6 +28,8 @@ var ( errorDoNotFindTheValue = errors.New("do not find the value") errorDecodeDescriptorFailed = errors.New("decode the descriptor failed") errorDescriptorSavedIsNotTheWanted = errors.New("the descriptor saved is not the wanted one") + errorDescInAsyncGCIsNotBytes = errors.New("desc in asyncGC is not bytes") + errorIDInAsyncGCIsNotUint64 = errors.New("id in asyncGC is not uint64") ) /* Internal descriptor table for schema management. @@ -105,7 +107,7 @@ func (dhi *DescriptorHandlerImpl) encodeDatabaseDescIntoValue(parentID uint64, return dhi.encodeFieldsIntoValue(parentID, uint64(desc.ID),desc.Name,descBytes) } -// decodeValue decodes the data from (parentID,ID,Name,Bytes) +// decodeValue decodes the data into (parentID,ID,Name,Bytes) func (dhi *DescriptorHandlerImpl) decodeValue(data []byte) ([]*orderedcodec.DecodedItem,error) { attrCnt := len(internalDescriptorTableDesc.Attributes) dis := make([]*orderedcodec.DecodedItem,0,attrCnt) @@ -207,10 +209,10 @@ func (dhi *DescriptorHandlerImpl) GetValuesWithPrefix(parentID uint64, callbackC //callbackForGetTableDescByName extracts the tabledesc by name func (dhi *DescriptorHandlerImpl) callbackForGetTableDescByName(callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) { //get the name and the desc - nameAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_name_ID] - descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID] - nameDI := dis[InternalDescriptorTableID_name_ID] - descDI := dis[InternalDescriptorTableID_desc_ID] + nameAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_name_ID] + descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID] + nameDI := dis[InternalDescriptorTable_name_ID] + descDI := dis[InternalDescriptorTable_desc_ID] if !(nameDI.IsValueType(nameAttr.Ttype) || descDI.IsValueType(descAttr.Ttype)) { return nil,errorTypeInValueNotEqualToTypeInAttribute @@ -273,8 +275,8 @@ func (dhi *DescriptorHandlerImpl) LoadRelationDescByID(parentID uint64, tableID if err != nil { return nil, err } - descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID] - descDI := dis[InternalDescriptorTableID_desc_ID] + descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID] + descDI := dis[InternalDescriptorTable_desc_ID] if !descDI.IsValueType(descAttr.Ttype) { return nil,errorTypeInValueNotEqualToTypeInAttribute } @@ -403,8 +405,8 @@ func (dhi *DescriptorHandlerImpl) LoadDatabaseDescByID(dbID uint64) (*descriptor if err != nil { return nil, err } - descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID] - descDI := dis[InternalDescriptorTableID_desc_ID] + descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID] + descDI := dis[InternalDescriptorTable_desc_ID] if !descDI.IsValueType(descAttr.Ttype) { return nil,errorTypeInValueNotEqualToTypeInAttribute } @@ -519,14 +521,37 @@ func (dhi *DescriptorHandlerImpl) encodeAsyncgcValue(epoch uint64, dbID uint64, return out,nil } +// MakePrefixWithEpochAndDBIDAndTableID makes the prefix(tenantID,dbID,tableID,indexID,delEpoch,delDbID,delTableID) +func (dhi *DescriptorHandlerImpl) MakePrefixWithEpochAndDBIDAndTableID( + dbID uint64, tableID uint64, indexID uint64, + gcEpoch uint64, gcDbID uint64,gcTableID uint64)(TupleKey, *orderedcodec.EncodedItem){ + tke := dhi.codecHandler.GetEncoder() + + //make prefix + var prefix TupleKey + // tenantID,dbID,tableID,indexID + prefix,_ = tke.EncodeIndexPrefix(prefix, + dbID, + tableID, + indexID) + + // append gcEpoch + prefix,_ = tke.oe.EncodeUint64(prefix,gcEpoch) + // append gcDbId + prefix,_ = tke.oe.EncodeUint64(prefix,gcDbID) + // append gcTableId + prefix,_ = tke.oe.EncodeUint64(prefix,gcTableID) + return prefix,nil +} + func (dhi *DescriptorHandlerImpl) StoreRelationDescIntoAsyncGC(epoch uint64, dbID uint64, desc *descriptor.RelationDesc) error { - //save thing into the internal async gc (epoch(pk),dbid,tableid,desc) + //save thing into the internal async gc (epoch,dbid,tableid,desc), pk(epoch,dbid,tableid) //prefix(tenantID,dbID,tableID,indexID,epoch) var key TupleKey - key,_ = dhi.MakePrefixWithOneExtraID(InternalDatabaseID, + key,_ = dhi.MakePrefixWithEpochAndDBIDAndTableID(InternalDatabaseID, InternalAsyncGCTableID, uint64(PrimaryIndexID), - epoch) + epoch,dbID,uint64(desc.ID)) value, err := dhi.encodeAsyncgcValue(epoch,dbID, uint64(desc.ID),desc) if err != nil { @@ -541,9 +566,88 @@ func (dhi *DescriptorHandlerImpl) StoreRelationDescIntoAsyncGC(epoch uint64, dbI return nil } -func (dhi *DescriptorHandlerImpl) ListRelationDescFromAsyncGC(epoch uint64) ([]*descriptor.RelationDesc, error) { - //TODO: - return nil, nil +// decodeValueIntoEpochGCItem decodes bytes into the (epoch,dbID,tableID,desc) +func (dhi *DescriptorHandlerImpl) decodeValueIntoEpochGCItem(data []byte) ([]*orderedcodec.DecodedItem,error) { + attrCnt := len(internalAsyncGCTableDesc.Attributes) + dis := make([]*orderedcodec.DecodedItem,0,attrCnt) + for j := 0; j < attrCnt; j++ { + rest, di, err := dhi.serializer.DeserializeValue(data) + if err != nil { + return nil, err + } + dis = append(dis,di) + data = rest + } + return dis,nil +} + +func (dhi *DescriptorHandlerImpl) ListRelationDescFromAsyncGC(epoch uint64) ([]descriptor.EpochGCItem, error) { + var startPrefix TupleKey + tke := dhi.codecHandler.GetEncoder() + // tenantID,dbID,tableID,indexID + startPrefix,_ = tke.EncodeIndexPrefix(nil, + InternalDatabaseID, + InternalAsyncGCTableID, + uint64(PrimaryIndexID)) + + var endPrefix TupleKey + var nextEpoch uint64 = epoch + if epoch < math.MaxUint64 { + nextEpoch++ + } + // tenantID,dbID,tableID,indexID,epoch + endPrefix,_ = dhi.MakePrefixWithOneExtraID(InternalDatabaseID, + InternalAsyncGCTableID, + uint64(PrimaryIndexID), + nextEpoch) + + //get all epochgc (epoch,dbID,tableID,desc) + gcItems, err := dhi.kvHandler.GetRange(startPrefix,endPrefix) + if err != nil { + return nil, err + } + + var retItems []descriptor.EpochGCItem + + for _, item := range gcItems { + dis, err := dhi.decodeValueIntoEpochGCItem(item) + if err != nil { + return nil, err + } + + if descBytes,ok := dis[InternalAsyncGCTable_desc_ID].Value.([]byte); ok { + desc, err := UnmarshalRelationDesc(descBytes) + if err != nil { + return nil, err + } + //check maximal epoch + if epoch >= desc.Max_access_epoch { + var delDbID, delTableID,delEpoch uint64 + var ok2,ok3,ok4 bool + if delDbID,ok2 = dis[InternalAsyncGCTable_dbID_ID].Value.(uint64) ; !ok2 { + return nil, errorIDInAsyncGCIsNotUint64 + } + + if delTableID,ok3 = dis[InternalAsyncGCTable_tableID_ID].Value.(uint64) ; !ok3 { + return nil, errorIDInAsyncGCIsNotUint64 + } + + if delEpoch,ok4 = dis[InternalAsyncGCTable_epoch_ID].Value.(uint64); !ok4{ + return nil, errorIDInAsyncGCIsNotUint64 + } + + retItems = append(retItems,descriptor.EpochGCItem{ + Epoch: delEpoch, + DbID: delDbID, + TableID: delTableID, + }) + + } + }else{ + return nil, errorDescInAsyncGCIsNotBytes + } + } + return retItems, nil } diff --git a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go index 4edf250fe26c0..30d6607d80689 100644 --- a/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go +++ b/pkg/vm/engine/tpe/tuplecodec/descriptorhandler_test.go @@ -881,4 +881,52 @@ func TestDescriptorHandlerImpl_StoreDatabaseDescByID(t *testing.T) { convey.So(err,convey.ShouldBeError) } }) +} + +func TestDescriptorHandlerImpl_StoreRelationDescIntoAsyncGC(t *testing.T) { + convey.Convey("store relation desc into asyncgc",t, func() { + tch := NewTupleCodecHandler(SystemTenantID) + kv := NewMemoryKV() + serial := &DefaultValueSerializer{} + kvLimit := uint64(2) + + dhi := NewDescriptorHandlerImpl(tch,kv,serial,kvLimit) + + make_relation_desc := func(from *descriptor.RelationDesc,name string,id uint32) *descriptor.RelationDesc { + desc := new(descriptor.RelationDesc) + *desc = *from + desc.ID = id + desc.Name = name + return desc + } + + var wantGCItems []descriptor.EpochGCItem + + cnt := uint64(2) + + for epoch := uint64(0); epoch < cnt; epoch++ { + offset := 100 * epoch + for dbId := offset + uint64(0); dbId < offset + cnt; dbId++ { + for tableId := uint32(0); uint64(tableId) < cnt; tableId++ { + wantGCItems = append(wantGCItems,descriptor.EpochGCItem{ + Epoch: epoch, + DbID: dbId, + TableID: uint64(tableId), + }) + + tableName := fmt.Sprintf("table%d",tableId) + desc := make_relation_desc(internalDescriptorTableDesc,tableName,tableId) + err := dhi.StoreRelationDescIntoAsyncGC(epoch, dbId, desc) + convey.So(err,convey.ShouldBeNil) + } + } + } + + gcItems, err := dhi.ListRelationDescFromAsyncGC(cnt) + convey.So(err,convey.ShouldBeNil) + convey.So(len(gcItems),convey.ShouldEqual,len(wantGCItems)) + for i, item := range gcItems { + convey.So(item,convey.ShouldResemble,wantGCItems[i]) + } + }) } \ No newline at end of file diff --git a/pkg/vm/engine/tpe/tuplecodec/epoch.go b/pkg/vm/engine/tpe/tuplecodec/epoch.go new file mode 100644 index 0000000000000..71ab314df42b5 --- /dev/null +++ b/pkg/vm/engine/tpe/tuplecodec/epoch.go @@ -0,0 +1,95 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tuplecodec + +import ( + "errors" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/descriptor" + "sync" +) + +var ( + errorItIsNotDescriptorHandlerImpl = errors.New("it is not descriptor handler impl") +) + +type EpochHandler struct { + epoch uint64 + tch *TupleCodecHandler + dh descriptor.DescriptorHandler + kv KVHandler + rwlock sync.RWMutex +} + +func NewEpochHandler(tch *TupleCodecHandler, + dh descriptor.DescriptorHandler, + kv KVHandler) *EpochHandler { + return &EpochHandler{ + tch: tch, + dh: dh, + kv: kv, + } +} + +func (eh *EpochHandler) SetEpoch(e uint64) { + eh.rwlock.Lock() + defer eh.rwlock.Unlock() + eh.epoch = e +} + +func (eh *EpochHandler) GetEpoch() uint64{ + eh.rwlock.RLock() + defer eh.rwlock.RUnlock() + return eh.epoch +} + +func (eh *EpochHandler) RemoveDeletedTable(epoch uint64) (int, error) { + eh.rwlock.Lock() + defer eh.rwlock.Unlock() + + //1.load epoch items from asyncGC + gcItems, err := eh.dh.ListRelationDescFromAsyncGC(epoch) + if err != nil { + return 0, err + } + + tke := eh.tch.GetEncoder() + + dhi,ok := eh.dh.(*DescriptorHandlerImpl) + if !ok { + return 0, errorItIsNotDescriptorHandlerImpl + } + + //2. drop table data on gcItems + for _, item := range gcItems { + //delete the data in the table + prefixDeleted, _ := tke.EncodeIndexPrefix(nil, item.DbID, item.TableID,uint64(PrimaryIndexID)) + err = eh.kv.DeleteWithPrefix(prefixDeleted) + if err != nil { + return 0, err + } + + //3.delete gc item in asyncGC table + epochItemKeyDeleted,_ := dhi.MakePrefixWithEpochAndDBIDAndTableID( + InternalDatabaseID,InternalAsyncGCTableID,uint64(PrimaryIndexID), + item.Epoch,item.DbID,item.TableID) + + err = dhi.kvHandler.Delete(epochItemKeyDeleted) + if err != nil { + return 0, err + } + } + + return len(gcItems), nil +} \ No newline at end of file diff --git a/pkg/vm/engine/tpe/tuplecodec/epoch_test.go b/pkg/vm/engine/tpe/tuplecodec/epoch_test.go new file mode 100644 index 0000000000000..6a552d64a792c --- /dev/null +++ b/pkg/vm/engine/tpe/tuplecodec/epoch_test.go @@ -0,0 +1,91 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tuplecodec + +import ( + "fmt" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/descriptor" + "github.com/smartystreets/goconvey/convey" + "testing" +) + +func TestEpochHandler_SetEpoch(t *testing.T) { + convey.Convey("set/get",t, func() { + eh := NewEpochHandler(nil,nil,nil) + + for i := 0; i < 100; i++ { + eh.SetEpoch(uint64(i)) + convey.So(uint64(i),convey.ShouldEqual, eh.GetEpoch()) + } + }) +} + +func TestEpochHandler_RemoveDeletedTable(t *testing.T) { + convey.Convey("remove deleted table",t, func() { + tch := NewTupleCodecHandler(SystemTenantID) + kv := NewMemoryKV() + serial := &DefaultValueSerializer{} + kvLimit := uint64(2) + + dhi := NewDescriptorHandlerImpl(tch,kv,serial,kvLimit) + + eh := NewEpochHandler(tch,dhi,kv) + + make_relation_desc := func(from *descriptor.RelationDesc,name string,id uint32) *descriptor.RelationDesc { + desc := new(descriptor.RelationDesc) + *desc = *from + desc.ID = id + desc.Name = name + return desc + } + + var wantGCItems []descriptor.EpochGCItem + + cnt := uint64(2) + + for epoch := uint64(0); epoch < cnt; epoch++ { + offset := 100 * epoch + for dbId := UserTableIDOffset + offset + uint64(0); dbId < UserTableIDOffset + offset + cnt; dbId++ { + for tableId := uint32(0); uint64(tableId) < cnt; tableId++ { + wantGCItems = append(wantGCItems,descriptor.EpochGCItem{ + Epoch: epoch, + DbID: dbId, + TableID: uint64(tableId), + }) + + tableName := fmt.Sprintf("table%d",tableId) + desc := make_relation_desc(internalDescriptorTableDesc,tableName,tableId) + err := dhi.StoreRelationDescIntoAsyncGC(epoch, dbId, desc) + convey.So(err,convey.ShouldBeNil) + } + } + } + + gcItems, err := dhi.ListRelationDescFromAsyncGC(cnt) + convey.So(err,convey.ShouldBeNil) + convey.So(len(gcItems),convey.ShouldEqual,len(wantGCItems)) + for i, item := range gcItems { + convey.So(item,convey.ShouldResemble,wantGCItems[i]) + } + + gcCnt, err := eh.RemoveDeletedTable(cnt / 2 - 1) + convey.So(err,convey.ShouldBeNil) + convey.So(uint64(gcCnt),convey.ShouldEqual,cnt*cnt*cnt / 2) + + gcItems, err = dhi.ListRelationDescFromAsyncGC(cnt) + convey.So(err,convey.ShouldBeNil) + convey.So(len(gcItems),convey.ShouldEqual,cnt*cnt*cnt / 2) + }) +} \ No newline at end of file diff --git a/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go b/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go index 69ffbbe59a912..801e797526a2c 100644 --- a/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go +++ b/pkg/vm/engine/tpe/tuplecodec/indexhandler_test.go @@ -115,9 +115,9 @@ func TestIndexHandlerImpl_ReadFromIndex(t *testing.T) { convey.So(err,convey.ShouldBeNil) wantAttr := []*descriptor.AttributeDesc{ - &internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_parentID_ID], - &internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_id_ID], - &internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID], + &internalDescriptorTableDesc.Attributes[InternalDescriptorTable_parentID_ID], + &internalDescriptorTableDesc.Attributes[InternalDescriptorTable_id_ID], + &internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID], } readCtx := &ReadContext{ diff --git a/pkg/vm/engine/tpe/tuplecodec/kv_types.go b/pkg/vm/engine/tpe/tuplecodec/kv_types.go index 67a1391253cf2..85fb44ca60ac7 100644 --- a/pkg/vm/engine/tpe/tuplecodec/kv_types.go +++ b/pkg/vm/engine/tpe/tuplecodec/kv_types.go @@ -42,6 +42,9 @@ type KVHandler interface { // Delete deletes the key Delete(key TupleKey) error + // DeleteWithPrefix keys with the prefix + DeleteWithPrefix(prefix TupleKey) error + // Get gets the value of the key Get(key TupleKey)(TupleValue, error) diff --git a/pkg/vm/engine/tpe/tuplecodec/memorykv.go b/pkg/vm/engine/tpe/tuplecodec/memorykv.go index 188d50babb3ca..842b77126ef94 100644 --- a/pkg/vm/engine/tpe/tuplecodec/memorykv.go +++ b/pkg/vm/engine/tpe/tuplecodec/memorykv.go @@ -172,6 +172,31 @@ func (m *MemoryKV) Delete(key TupleKey) error { return nil } +func (m *MemoryKV) DeleteWithPrefix(prefix TupleKey) error { + m.rwLock.Lock() + defer m.rwLock.Unlock() + + var keys []TupleKey + iter := func(i btree.Item) bool { + if x,ok := i.(*MemoryItem); ok { + if bytes.HasPrefix(x.key,prefix) { + keys = append(keys,x.key) + } + }else{ + keys = append(keys,nil) + } + return true + } + + m.container.Ascend(iter) + + for _, key := range keys { + m.container.Delete(NewMemoryItem(key,nil)) + } + + return nil +} + func (m *MemoryKV) Get(key TupleKey) (TupleValue, error) { m.rwLock.RLock() defer m.rwLock.RUnlock() diff --git a/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go b/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go index bd5d58b8b2a5c..815e6b784f3f6 100644 --- a/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go +++ b/pkg/vm/engine/tpe/tuplecodec/memorykv_test.go @@ -15,6 +15,7 @@ package tuplecodec import ( + "bytes" "fmt" "github.com/smartystreets/goconvey/convey" "reflect" @@ -391,4 +392,55 @@ func TestMemoryKV_GetWithPrefix(t *testing.T) { last = SuccessorOfKey(keys[len(keys) - 1]) } }) +} + +func TestMemoryKV_DeletePrefix(t *testing.T) { + convey.Convey("delete prefix",t, func() { + kv := NewMemoryKV() + cnt := 10 + + genData := func(cnt int,handler KVHandler,prefix string) ([]TupleKey,[]TupleValue) { + var keys []TupleKey + var values []TupleValue + for i := 0; i < cnt; i++ { + key := fmt.Sprintf("%s%d",prefix,i) + value := fmt.Sprintf("v%d",i) + keys = append(keys,[]byte(key)) + values = append(values,[]byte(value)) + err := handler.Set([]byte(key), []byte(value)) + convey.So(err,convey.ShouldBeNil) + } + + return keys, values + } + + wKeys1, wValues1 := genData(cnt,kv,"abc") + err := kv.SetBatch(wKeys1,wValues1) + for _, e := range err { + convey.So(e,convey.ShouldBeNil) + } + + wKeys2, wValues2 := genData(cnt,kv,"cde") + err = kv.SetBatch(wKeys2,wValues2) + for _, e := range err { + convey.So(e,convey.ShouldBeNil) + } + + err2 := kv.DeleteWithPrefix([]byte("abc")) + convey.So(err2,convey.ShouldBeNil) + + resKeys, resValues, err3 := kv.GetWithPrefix(TupleKey("cde"), len("cde"), uint64(cnt)) + convey.So(err3,convey.ShouldBeNil) + + for i, key := range resKeys { + for j, wKey := range wKeys2 { + if bytes.Equal(key,wKey) { + convey.So(resValues[i], + convey.ShouldResemble,wValues2[j]) + }else { + break + } + } + } + }) } \ No newline at end of file