-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmongoDB_use.py
116 lines (94 loc) · 3.44 KB
/
mongoDB_use.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
pyMongo的使用方法
'''
import pymongo
import datetime
def get_db():
# 建立连接
try:
client = pymongo.MongoClient(host="10.2.25.10", port=27017)
except Exception as e:
print(str(e))
db = client['example']
# 或者 db = client.example
print(db)
return db
def get_collection(db):
# 选择集合(mongo中collection和database都是延时创建的)
# 创建一个表,表明为informations,例如: db.informations.insert()
coll = db['informations']
print(db.collection_names())
return coll
def insert_one_doc(db):
# 插入一个document
coll = db['informations']
information = {"name": "liujian", "age": "25"}
information_id = coll.insert(information)
print(information_id)
def insert_multi_docs(db):
# 批量插入documents,插入一个数组
coll = db['informations']
information = [{"name": "xiaoming", "age": "25"},
{"name": "xiaoqiang", "age": "24"}]
information_id = coll.insert(information)
print(information_id)
def get_one_doc(db):
# 有就返回一个,没有就返回None
coll = db['informations']
print(coll.find_one()) # 返回第一条记录
print(coll.find_one({"name": "liujian"}))
print(coll.find_one({"name": "none"}))
def get_one_by_id(db):
# 通过objectid来查找一个doc
coll = db['informations']
obj = coll.find_one()
obj_id = obj["_id"]
print("_id 为ObjectId类型,obj_id:" + str(obj_id))
print(coll.find_one({"_id": obj_id}))
# 需要注意这里的obj_id是一个对象,不是一个str,使用str类型作为_id的值无法找到记录
print("_id 为str类型 ")
print(coll.find_one({"_id": str(obj_id)}))
# 可以通过ObjectId方法把str转成ObjectId类型
from bson.objectid import ObjectId
print("_id 转换成ObjectId类型")
print(coll.find_one({"_id": ObjectId(str(obj_id))}))
def get_many_docs(db):
# mongo中提供了过滤查找的方法,可以通过各种条件筛选来获取数据集,还可以对数据进行计数,排序等处理
coll = db['informations']
# ASCENDING = 1 升序;DESCENDING = -1降序;default is ASCENDING
for item in coll.find().sort("age", pymongo.DESCENDING):
print(item)
count = coll.count()
print("集合中所有数据 %s个" % int(count))
# 条件查询
count = coll.find({"name": "liujian"}).count()
print("liujian: %s" % count)
def clear_all_datas(db):
# 清空一个集合中的所有数据
db["informations"].remove()
if __name__ == '__main__':
db = get_db()
my_collection = get_collection(db)
post = {"author": "Mike", "text": "My first blog post!", "tags": ["mongodb", "python", "pymongo"],
"date": datetime.datetime.utcnow()}
# 插入记录
my_collection.insert(post)
insert_one_doc(db)
# 条件查询
print(my_collection.find_one({"x": "10"}))
# 查询表中所有的数据
for iii in my_collection.find():
print(iii)
print(my_collection.count())
my_collection.update({"author": "Mike"},
{"author": "liujian", "text": "My first blog post!", "tags": ["mongodb", "python", "pymongo"],
"date": datetime.datetime.utcnow()})
for jjj in my_collection.find():
print(jjj)
get_one_doc(db)
get_one_by_id(db)
get_many_docs(db)
# clear_all_datas(db)
clear_all_datas(db)