diff --git a/edgraph/server.go b/edgraph/server.go index 62be9d16d44..34983098505 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -838,7 +838,7 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Respo if req.StartTs == 0 { req.StartTs = posting.Oracle().MaxAssigned() } - queryRequest.Cache = worker.NoTxnCache + queryRequest.Cache = worker.NoCache } if req.StartTs == 0 { diff --git a/posting/list.go b/posting/list.go index b2a509432ae..4caf7742dc1 100644 --- a/posting/list.go +++ b/posting/list.go @@ -45,7 +45,7 @@ var ( ErrRetry = errors.New("Temporary error. Please retry") // ErrNoValue would be returned if no value was found in the posting list. ErrNoValue = errors.New("No value found") - errStopIteration = errors.New("Stop iteration") + ErrStopIteration = errors.New("Stop iteration") emptyPosting = &pb.Posting{} maxListSize = mb / 2 ) @@ -660,7 +660,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e log.Fatalf("Unhandled case during iteration of posting list.") } } - if err == errStopIteration { + if err == ErrStopIteration { return nil } return err @@ -673,7 +673,7 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { var count int err := l.iterate(readTs, afterUid, func(p *pb.Posting) error { count++ - return errStopIteration + return ErrStopIteration }) if err != nil { return false, err @@ -1065,7 +1065,7 @@ func (l *List) postingForLangs(readTs uint64, langs []string) (pos *pb.Posting, if p.PostingType == pb.Posting_VALUE_LANG { pos = p found = true - return errStopIteration + return ErrStopIteration } return nil }) @@ -1112,7 +1112,7 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti pos = p found = true } - return errStopIteration + return ErrStopIteration }) return found, pos, err diff --git a/posting/list_test.go b/posting/list_test.go index ed4ea02c3e8..5a294dd3b6a 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -155,7 +155,7 @@ func TestAddMutation(t *testing.T) { func getFirst(l *List, readTs uint64) (res pb.Posting) { l.Iterate(readTs, 0, func(p *pb.Posting) error { res = *p - return errStopIteration + return ErrStopIteration }) return res } diff --git a/types/s2.go b/types/s2.go index d678df05414..a4be7baf9c0 100644 --- a/types/s2.go +++ b/types/s2.go @@ -131,17 +131,52 @@ func Intersects(l1 *s2.Loop, l2 *s2.Loop) bool { return intersects(l1, l2) } -func closed(coords []geom.Coord) bool { - l := len(coords) - return coords[0][0] == coords[l-1][0] && coords[0][1] == coords[l-1][1] -} - func convertToGeom(str string) (geom.T, error) { + // validate would ensure that we have a closed loop for all the polygons. We don't support open + // loop polygons. + closed := func(p *geom.Polygon) error { + coords := p.Coords() + if len(coords) == 0 { + return errors.Errorf("Got empty polygon.") + } + // Check that first ring is closed. + c := coords[0] + l := len(c) + if c[0][0] == c[l-1][0] && c[0][1] == c[l-1][1] { + return nil + } + return errors.Errorf("Last coord not same as first") + } + + validate := func(g geom.T) (geom.T, error) { + switch v := g.(type) { + case *geom.MultiPolygon: + for i := 0; i < v.NumPolygons(); i++ { + if err := closed(v.Polygon(i)); err != nil { + return nil, err + } + } + case *geom.Polygon: + if err := closed(v); err != nil { + return nil, err + } + } + return g, nil + } + + var g geojson.Geometry + if err := json.Unmarshal([]byte(str), &g); err == nil { + t, err := g.Decode() + if err != nil { + return nil, err + } + return validate(t) + } + s := x.WhiteSpace.Replace(str) if len(s) < 5 { // [1,2] return nil, errors.Errorf("Invalid coordinates") } - var g geojson.Geometry var m json.RawMessage var err error @@ -156,18 +191,7 @@ func convertToGeom(str string) (geom.T, error) { if err != nil { return nil, errors.Wrapf(err, "Invalid coordinates") } - mp := g1.(*geom.MultiPolygon) - for i := 0; i < mp.NumPolygons(); i++ { - coords := mp.Polygon(i).Coords() - if len(coords) == 0 { - return nil, errors.Errorf("Got empty polygon inside multi-polygon.") - } - // Check that first ring is closed. - if !closed(mp.Polygon(i).Coords()[0]) { - return nil, errors.Errorf("Last coord not same as first") - } - } - return g1, nil + return validate(g1) } if s[0:3] == "[[[" { @@ -181,15 +205,7 @@ func convertToGeom(str string) (geom.T, error) { if err != nil { return nil, errors.Wrapf(err, "Invalid coordinates") } - coords := g1.(*geom.Polygon).Coords() - if len(coords) == 0 { - return nil, errors.Errorf("Got empty polygon.") - } - // Check that first ring is closed. - if !closed(coords[0]) { - return nil, errors.Errorf("Last coord not same as first") - } - return g1, nil + return validate(g1) } if s[0] == '[' { diff --git a/worker/task.go b/worker/task.go index bc3753506a9..327bfb57bc7 100644 --- a/worker/task.go +++ b/worker/task.go @@ -632,11 +632,10 @@ func (qs *queryState) handleUidPostings( tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} out.UidMatrix = append(out.UidMatrix, tlist) } - default: + case q.FacetParam != nil || facetsTree != nil: if i == 0 { - span.Annotate(nil, "default") + span.Annotate(nil, "default with facets") } - uidList := &pb.List{ Uids: make([]uint64, 0, pl.ApproxLen()), } @@ -668,6 +667,15 @@ func (qs *queryState) handleUidPostings( if q.FacetParam != nil { out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList}) } + default: + if i == 0 { + span.Annotate(nil, "default no facets") + } + uidList, err := pl.Uids(opts) + if err != nil { + return err + } + out.UidMatrix = append(out.UidMatrix, uidList) } } return nil @@ -695,19 +703,26 @@ func (qs *queryState) handleUidPostings( out.Counts = append(out.Counts, chunk.Counts...) out.UidMatrix = append(out.UidMatrix, chunk.UidMatrix...) } + var total int + for _, list := range out.UidMatrix { + total += len(list.Uids) + } + span.Annotatef(nil, "Total number of elements in matrix: %d", total) return nil } const ( // UseTxnCache indicates the transaction cache should be used. UseTxnCache = iota - // NoTxnCache indicates no transaction caches should be used. - NoTxnCache + // NoCache indicates no caches should be used. + NoCache ) // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { - span := otrace.FromContext(ctx) + ctx, span := otrace.StartSpan(ctx, "processTask."+q.Attr) + defer span.End() + stop := x.SpanTimer(span, "processTask"+q.Attr) defer stop() @@ -743,9 +758,8 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro if q.Cache == UseTxnCache { qs.cache = posting.Oracle().CacheAt(q.ReadTs) } - if qs.cache == nil { - qs.cache = posting.NewLocalCache(q.ReadTs) - } + // For now, remove the query level cache. It is causing contention for queries with high + // fan-out. out, err := qs.helpProcessTask(ctx, q, gid) if err != nil { @@ -867,7 +881,7 @@ func (qs *queryState) helpProcessTask( // If geo filter, do value check for correctness. if srcFn.geoQuery != nil { span.Annotate(nil, "handleGeoFunction") - if err := qs.filterGeoFunction(funcArgs{q, gid, srcFn, out}); err != nil { + if err := qs.filterGeoFunction(ctx, funcArgs{q, gid, srcFn, out}); err != nil { return nil, err } } @@ -1236,49 +1250,70 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err return nil } -func (qs *queryState) filterGeoFunction(arg funcArgs) error { +func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error { + span := otrace.FromContext(ctx) + stop := x.SpanTimer(span, "filterGeoFunction") + defer stop() + attr := arg.q.Attr uids := algo.MergeSorted(arg.out.UidMatrix) - isList := schema.State().IsList(attr) - filtered := &pb.List{} - for _, uid := range uids.Uids { - pl, err := qs.cache.Get(x.DataKey(attr, uid)) - if err != nil { - return err - } - if !isList { - val, err := pl.Value(arg.q.ReadTs) - if err == posting.ErrNoValue { - continue - } else if err != nil { + numGo, width := x.DivideAndRule(len(uids.Uids)) + if span != nil && numGo > 1 { + span.Annotatef(nil, "Number of uids: %d. NumGo: %d. Width: %d\n", + len(uids.Uids), numGo, width) + } + + filtered := make([]*pb.List, numGo) + filter := func(idx, start, end int) error { + filtered[idx] = &pb.List{} + out := filtered[idx] + for _, uid := range uids.Uids[start:end] { + pl, err := qs.cache.Get(x.DataKey(attr, uid)) + if err != nil { return err } - newValue := &pb.TaskValue{ValType: val.Tid.Enum(), Val: val.Value.([]byte)} - if types.MatchGeo(newValue, arg.srcFn.geoQuery) { - filtered.Uids = append(filtered.Uids, uid) + var tv pb.TaskValue + err = pl.Iterate(arg.q.ReadTs, 0, func(p *pb.Posting) error { + tv.ValType = p.ValType + tv.Val = p.Value + if types.MatchGeo(&tv, arg.srcFn.geoQuery) { + out.Uids = append(out.Uids, uid) + return posting.ErrStopIteration + } + return nil + }) + if err != nil { + return err } - - continue } + return nil + } - // list type - vals, err := pl.AllValues(arg.q.ReadTs) - if err == posting.ErrNoValue { - continue - } else if err != nil { - return err + errCh := make(chan error, numGo) + for i := 0; i < numGo; i++ { + start := i * width + end := start + width + if end > len(uids.Uids) { + end = len(uids.Uids) } - for _, val := range vals { - newValue := &pb.TaskValue{ValType: val.Tid.Enum(), Val: val.Value.([]byte)} - if types.MatchGeo(newValue, arg.srcFn.geoQuery) { - filtered.Uids = append(filtered.Uids, uid) - break - } + go func(idx, start, end int) { + errCh <- filter(idx, start, end) + }(i, start, end) + } + for i := 0; i < numGo; i++ { + if err := <-errCh; err != nil { + return err } } - + final := &pb.List{} + for _, out := range filtered { + final.Uids = append(final.Uids, out.Uids...) + } + if span != nil && numGo > 1 { + span.Annotatef(nil, "Total uids after filtering geo: %d", len(final.Uids)) + } for i := 0; i < len(arg.out.UidMatrix); i++ { - algo.IntersectWith(arg.out.UidMatrix[i], filtered, arg.out.UidMatrix[i]) + algo.IntersectWith(arg.out.UidMatrix[i], final, arg.out.UidMatrix[i]) } return nil }