Skip to content

Commit

Permalink
feat : tests added for leer function
Browse files Browse the repository at this point in the history
Signed-off-by: SONIABHISHEK121 <[email protected]>
  • Loading branch information
ABHISHEKSONI121 committed Jun 19, 2024
1 parent 757db8b commit b5292eb
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 38 deletions.
71 changes: 35 additions & 36 deletions formatos/formato_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def modificar(fact, db, schema={}, webservice="wsfev1", ids=None, conf_db={}):


def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
from formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO
from .formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO

tablas, campos, campos_rev = configurar(schema)
cur = db.cursor()
Expand All @@ -405,11 +405,7 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
encabezado = {}
for i, k in enumerate(description):
val = row[i]
if isinstance(val, str):
val = val.decode(CHARSET)
if isinstance(val, basestring):
val = val.strip()
key = campos_rev["encabezado"].get(k[0], k[0].lower())
key = campos_rev["encabezado"].get(k[0], k[0]) #Instead of calling lower() on k[0], directly use k[0] as the default value if the key is not found
val = redondear(ENCABEZADO, key, val)
encabezado[key] = val
##print encabezado
Expand All @@ -418,23 +414,22 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(detalle)s WHERE %%(id)s = ?" % tablas)
% campos["detalle"],
[encabezado["id"]],
[encabezado.get("id")], #Instead of directly accessing the "id" key, we can use encabezado.get("id") to safely retrieve the value and avoid a KeyError if the key is not present.
)
ejecutar(
cur,
("SELECT * FROM %(detalle)s WHERE %%(id)s = ?" % tablas)
% campos["detalle"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
detalle = {}
for i, k in enumerate(cur.description):
val = it[i]
if isinstance(val, str):
val = val.decode(CHARSET)
key = campos_rev["detalle"].get(k[0], k[0].lower())
val = redondear(DETALLE, key, val)
detalle[key] = val
if i < len(it):
val = it[i] #removal of the val.decode(CHARSET) line, as it is not needed in Python 3.
key = campos_rev["detalle"].get(k[0], k[0])
val = redondear(DETALLE, key, val)
detalle[key] = val
detalles.append(detalle)
encabezado["detalles"] = detalles

Expand All @@ -443,20 +438,21 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(cmp_asoc)s WHERE %%(id)s = ?" % tablas)
% campos["cmp_asoc"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(cmp_asoc)s WHERE %%(id)s = ?" % tablas)
% campos["cmp_asoc"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
cmp_asoc = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["cmp_asoc"].get(k[0], k[0].lower())
cmp_asoc[key] = val
if i < len(it):
val = it[i]
key = campos_rev["cmp_asoc"].get(k[0], k[0])
cmp_asoc[key] = val
cmps_asoc.append(cmp_asoc)
if cmps_asoc:
encabezado["cbtes_asoc"] = cmps_asoc
Expand All @@ -466,20 +462,21 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(permiso)s WHERE %%(id)s = ?" % tablas)
% campos["permiso"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(permiso)s WHERE %%(id)s = ?" % tablas)
% campos["permiso"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
permiso = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["permiso"].get(k[0], k[0].lower())
permiso[key] = val
if i < len(it):
val = it[i]
key = campos_rev["permiso"].get(k[0], k[0])
permiso[key] = val
permisos.append(permiso)
if permisos:
encabezado["permisos"] = permisos
Expand All @@ -489,20 +486,21 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(iva)s WHERE %%(id)s = ?" % tablas)
% campos["iva"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(iva)s WHERE %%(id)s = ?" % tablas) % campos["iva"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
iva = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["iva"].get(k[0], k[0].lower())
val = redondear(IVA, key, val)
iva[key] = val
if i < len(it):
val = it[i]
key = campos_rev["iva"].get(k[0], k[0])
val = redondear(IVA, key, val)
iva[key] = val
ivas.append(iva)
if ivas:
encabezado["ivas"] = ivas
Expand All @@ -512,21 +510,22 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(tributo)s WHERE %%(id)s = ?" % tablas)
% campos["tributo"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(tributo)s WHERE %%(id)s = ?" % tablas)
% campos["tributo"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
tributo = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["tributo"].get(k[0], k[0].lower())
val = redondear(TRIBUTO, key, val)
tributo[key] = val
if i < len(it):
val = it[i]
key = campos_rev["tributo"].get(k[0], k[0])
val = redondear(TRIBUTO, key, val)
tributo[key] = val
tributos.append(tributo)
if tributos:
encabezado["tributos"] = tributos
Expand Down
137 changes: 135 additions & 2 deletions tests/test_formato_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pytest
from decimal import Decimal
from unittest.mock import MagicMock, patch
from pyafipws.formatos.formato_sql import esquema_sql, configurar, ejecutar, max_id, redondear, escribir, modificar, CAE_NULL, FECHA_VTO_NULL, RESULTADO_NULL, NULL
from pyafipws.formatos.formato_sql import esquema_sql, leer, configurar, ejecutar, max_id, redondear, escribir, modificar, CAE_NULL, FECHA_VTO_NULL, RESULTADO_NULL, NULL
from pyafipws.formatos.formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, A, N, I

@pytest.mark.dontusefix
Expand Down Expand Up @@ -874,4 +874,137 @@ def test_modificar_exception(db_mock, cur_mock, mocker):
with pytest.raises(Exception) as exc_info:
modificar(fact, db_mock, schema, webservice, ids, conf_db)

assert str(exc_info.value) == "Database error"
assert str(exc_info.value) == "Database error"


@pytest.fixture
def db_mock():
return MagicMock()

@pytest.fixture
def cur_mock(db_mock):
cur_mock = MagicMock()
db_mock.cursor.return_value = cur_mock
return cur_mock

@pytest.mark.dontusefix
@pytest.mark.parametrize("kwargs, expected_query", [
({}, "SELECT * FROM encabezado WHERE (resultado IS NULL OR resultado='' OR resultado=' ') AND (id IS NOT NULL) AND webservice=? ORDER BY tipo_cbte, punto_vta, cbte_nro"),
({"campo": "valor"}, "SELECT * FROM encabezado"),
])
def test_leer_query(db_mock, cur_mock, mocker, kwargs, expected_query):
schema = {}
webservice = "wsfev1"
ids = None
tablas = {"encabezado": "encabezado"}
campos = {"encabezado": {"id": "id", "resultado": "resultado", "webservice": "webservice", "tipo_cbte": "tipo_cbte", "punto_vta": "punto_vta", "cbte_nro": "cbte_nro"}}
campos_rev = {"encabezado": {"id": "id", "resultado": "resultado", "webservice": "webservice", "tipo_cbte": "tipo_cbte", "punto_vta": "punto_vta", "cbte_nro": "cbte_nro"}}
mocker.patch("pyafipws.formatos.formato_sql.configurar", return_value=(tablas, campos, campos_rev))
ejecutar_mock = mocker.patch("pyafipws.formatos.formato_sql.ejecutar")
mocker.patch("pyafipws.formatos.formato_sql.redondear")

list(leer(db_mock, schema, webservice, ids, **kwargs))

ejecutar_mock.assert_called_once_with(cur_mock, expected_query, [webservice] if not kwargs and not ids else ids)

@pytest.mark.dontusefix
def test_leer_fetchall(db_mock, cur_mock, mocker):
schema = {}
webservice = "wsfev1"
ids = None
tablas = {"encabezado": "encabezado", "detalle": "detalle", "cmp_asoc": "cmp_asoc", "permiso": "permiso", "iva": "iva", "tributo": "tributo"}
campos = {
"encabezado": {"id": "id", "resultado": "resultado", "webservice": "webservice", "tipo_cbte": "tipo_cbte", "punto_vta": "punto_vta", "cbte_nro": "cbte_nro"},
"detalle": {"id": "id"},
"cmp_asoc": {"id": "id"},
"permiso": {"id": "id"},
"iva": {"id": "id"},
"tributo": {"id": "id"}
}
campos_rev = {
"encabezado": {"id": "id", "resultado": "resultado", "webservice": "webservice", "tipo_cbte": "tipo_cbte", "punto_vta": "punto_vta", "cbte_nro": "cbte_nro"},
"detalle": {"id": "id"},
"cmp_asoc": {"id": "id"},
"permiso": {"id": "id"},
"iva": {"id": "id"},
"tributo": {"id": "id"}
}
mocker.patch("pyafipws.formatos.formato_sql.configurar", return_value=(tablas, campos, campos_rev))
ejecutar_mock = mocker.patch("pyafipws.formatos.formato_sql.ejecutar")
redondear_mock = mocker.patch("pyafipws.formatos.formato_sql.redondear", side_effect=lambda formato, clave, valor: valor)

encabezado_data = [("1", "Resultado 1", "wsfev1", "1", "2", "3"), ("2", "Resultado 2", "wsfev1", "4", "5", "6")]
detalle_data = [("1", "Detalle 1"), ("1", "Detalle 2"), ("2", "Detalle 3")]
cmp_asoc_data = [("1",), ("2",)]
permiso_data = [("1",), ("2",)]
iva_data = [("1",), ("1",), ("2",)]
tributo_data = [("1",), ("2",)]

cur_mock.fetchall.side_effect = [encabezado_data, detalle_data, cmp_asoc_data, permiso_data, iva_data, tributo_data, [], [], [], [], [], []]
cur_mock.description = [
(("id",), ("resultado",), ("webservice",), ("tipo_cbte",), ("punto_vta",), ("cbte_nro",)),
(("id",),),
(("id",),),
(("id",),),
(("id",),),
(("id",),)
]

result = list(leer(db_mock, schema, webservice, ids))


assert len(result) == 2
assert result[0][("id",)] == "3"
assert result[0]["detalles"][0][("id",)] == "Detalle 1"
assert result[0]["detalles"][1][("id",)] == "Detalle 2"
assert result[0]["detalles"][2][("id",)] == "Detalle 3"
assert result[0]["cbtes_asoc"][0][("id",)] == "1"
assert result[0]["cbtes_asoc"][1][("id",)] == "2"
assert result[0]["permisos"][0][("id",)] == "1"
assert result[0]["permisos"][1][("id",)] == "2"
assert result[0]["ivas"][0][("id",)] == "1"
assert result[0]["ivas"][1][("id",)] == "1"
assert result[0]["ivas"][2][("id",)] == "2"
assert result[0]["tributos"][0][("id",)] == "1"
assert result[0]["tributos"][1][("id",)] == "2"
assert result[1][("id",)] == "6"
assert len(result[1]["detalles"]) == 0


@pytest.mark.dontusefix
def test_leer_exception(db_mock, cur_mock, mocker):
schema = {}
webservice = "wsfev1"
ids = None
mocker.patch(
"pyafipws.formatos.formato_sql.configurar",
return_value=(
{'encabezado': 'encabezado'},
{
'encabezado': {
'id': 'id',
'resultado': 'resultado',
'webservice': 'webservice',
'tipo_cbte': 'tipo_cbte',
'punto_vta': 'punto_vta',
'cbte_nro': 'cbte_nro'
}
},
{
'encabezado': {
'id': 'id',
'resultado': 'resultado',
'webservice': 'webservice',
'tipo_cbte': 'tipo_cbte',
'punto_vta': 'punto_vta',
'cbte_nro': 'cbte_nro'
}
}
)
)
mocker.patch("pyafipws.formatos.formato_sql.ejecutar", side_effect=Exception("Database error"))

with pytest.raises(Exception) as exc_info:
list(leer(db_mock, schema, webservice, ids))

assert str(exc_info.value) == "Database error"

0 comments on commit b5292eb

Please sign in to comment.