Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
Luciano Barletta
/
message-service
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
1
Wiki
Network
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
Commit 68d15a86
authored
2019-09-27 15:52:10 +0000
by
Luciano Barletta
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
updated for multifile request, made some improvements, not tested
1 parent
3fdaf358
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
198 additions
and
100 deletions
.gitignore
database.py
deploy.py
enums.py
process.py
services.py
test_flask.py
.gitignore
View file @
68d15a8
...
@@ -3,3 +3,4 @@
...
@@ -3,3 +3,4 @@
!*.md
!*.md
!templates
!templates
!install.sh
!install.sh
!tests
\ No newline at end of file
\ No newline at end of file
database.py
View file @
68d15a8
...
@@ -6,12 +6,9 @@ class DBconnection:
...
@@ -6,12 +6,9 @@ class DBconnection:
tables
=
[
"msg"
,
"history"
]
tables
=
[
"msg"
,
"history"
]
states
=
[
States
.
queued
,
States
.
delivered
]
structure
=
{
structure
=
{
Table
.
id
:
"integer PRIMARY KEY"
,
Table
.
id
:
"integer PRIMARY KEY"
,
Table
.
path
:
"text"
,
Table
.
path
:
"text"
,
Table
.
file
:
"text"
,
Table
.
serv
:
"text"
,
Table
.
serv
:
"text"
,
Table
.
dest
:
"text"
,
Table
.
dest
:
"text"
,
Table
.
type
:
"text"
,
Table
.
type
:
"text"
,
...
@@ -67,19 +64,36 @@ class DBconnection:
...
@@ -67,19 +64,36 @@ class DBconnection:
con
.
close
()
con
.
close
()
return
id
return
id
def
update
(
self
,
table
,
comparator
,
alterations
):
if
not
table
in
DBconnection
.
tables
:
return
"La tabla "
+
table
+
" no existe o no está contemplada"
if
not
Table
.
validate
(
comparator
[
0
]):
return
"El comparador no es una columna valida"
query
=
"UPDATE "
+
table
+
" SET "
where
=
" WHERE "
+
comparator
[
0
]
+
"="
+
comparator
[
1
]
for
column
in
alterations
:
if
not
self
.
check
(
column
,
alterations
[
column
]):
return
"El dato '"
+
alterations
[
column
]
+
"' no es valido"
query
+=
column
+
"='"
+
alterations
[
column
]
+
"',"
query
=
query
.
strip
(
","
)
ipdb
.
set_trace
()
con
=
sqlite3
.
connect
(
self
.
db
)
cursor
=
con
.
cursor
()
cursor
.
execute
(
query
+
where
)
con
.
commit
()
con
.
close
()
def
check
(
self
,
column
,
data
):
def
check
(
self
,
column
,
data
):
if
column
==
Table
.
file
:
return
True
if
column
==
Table
.
path
:
if
column
==
Table
.
path
:
return
True
return
True
if
column
==
Table
.
serv
:
if
column
==
Table
.
serv
:
return
Services
.
validate
(
data
)
return
Services
.
validate
(
data
)
if
column
==
Table
.
dest
and
len
(
data
)
<=
13
:
if
column
==
Table
.
dest
:
return
True
return
True
if
column
==
Table
.
type
:
if
column
==
Table
.
type
:
return
Datatypes
.
validate
(
data
)
return
Datatypes
.
validate
(
data
)
if
column
==
Table
.
state
:
if
column
==
Table
.
state
:
return
(
data
in
DBconnection
.
states
)
return
States
.
validate
(
data
)
return
False
return
False
@staticmethod
@staticmethod
...
...
deploy.py
View file @
68d15a8
...
@@ -14,7 +14,13 @@ app = Flask(__name__)
...
@@ -14,7 +14,13 @@ app = Flask(__name__)
retry_timer
=
10
retry_timer
=
10
clean_timer
=
20
clean_timer
=
20
prefix_lenght
=
16
prefix_lenght
=
16
filename
=
{}
operation_timer
=
86400
# folder for all messages
msgfolder
=
"msg/"
# database connection
process
=
Process
(
"messages.db"
)
@app.route
(
'/'
)
@app.route
(
'/'
)
def
main
():
def
main
():
...
@@ -30,14 +36,30 @@ def key():
...
@@ -30,14 +36,30 @@ def key():
@app.route
(
'/data'
,
methods
=
[
'POST'
])
@app.route
(
'/data'
,
methods
=
[
'POST'
])
def
data
():
def
data
():
prefix
=
newprefix
()
prefix
=
newprefix
()
path
=
msgfolder
+
prefix
+
"/"
os
.
mkdir
(
path
)
key
=
request
.
files
.
get
(
'key'
)
key
=
request
.
files
.
get
(
'key'
)
if
key
!=
None
:
if
key
:
key
.
save
(
prefix
+
"_key.enc"
)
request
.
files
[
file
]
.
save
(
path
+
"rand.key.enc"
)
request
.
files
[
'data'
]
.
save
(
prefix
+
".enc"
)
# decrypt random key with stored private key and store in prefix folder
os
.
system
(
"openssl rsautl -decrypt -inkey rsa_key.pri -in "
+
path
+
"rand.key.enc -out "
+
path
+
"rand.key"
)
os
.
remove
(
path
+
"rand.key.enc"
)
for
file
in
request
.
files
:
# if key exists and this is not it
if
key
and
file
!=
"key"
:
request
.
files
[
file
]
.
save
(
filepath
+
".enc"
)
# decrypt file with decrypted random key and store in prefix folder
os
.
system
(
"openssl enc -d -aes-256-cbc -in "
+
filepath
+
".enc -out "
+
filepath
+
" -pass file:"
+
path
+
"rand.key"
)
os
.
remove
(
filepath
+
".enc"
)
else
:
else
:
request
.
files
[
'data'
]
.
save
(
prefix
)
request
.
files
[
file
]
.
save
(
path
+
request
.
files
[
file
]
.
filename
)
filename
[
prefix
]
=
request
.
files
[
'data'
]
.
filename
return
prefix
if
key
:
os
.
remove
(
path
+
"rand.key"
)
return
str
(
process
.
datastore
(
path
))
def
newprefix
():
def
newprefix
():
prefix
=
""
prefix
=
""
...
@@ -55,80 +77,41 @@ def newprefix():
...
@@ -55,80 +77,41 @@ def newprefix():
@app.route
(
'/msg'
,
methods
=
[
'POST'
])
@app.route
(
'/msg'
,
methods
=
[
'POST'
])
def
msg
():
def
msg
():
process
=
Process
(
'messages.db'
)
id
=
request
.
values
[
'id'
]
prefix
=
request
.
values
[
'id'
]
if
prefix
not
in
filename
:
return
"El id de la data es invalido"
# symetric key was sent, decrypt data
if
os
.
path
.
exists
(
prefix
+
"_key.enc"
):
# decrypt random key with stored private key and store in host folder
os
.
system
(
"openssl rsautl -decrypt -inkey rsa_key.pri -in "
+
prefix
+
"_key.enc -out "
+
prefix
+
"_key"
)
# decrypt JSON with decrypted random key and store in dir folder
os
.
system
(
"openssl enc -d -aes-256-cbc -in "
+
prefix
+
".enc -out "
+
prefix
+
" -pass file:"
+
prefix
+
"_key"
)
# delete garbage
os
.
system
(
"rm "
+
prefix
+
".enc"
)
os
.
system
(
"rm "
+
prefix
+
"_key.enc"
)
os
.
system
(
"rm "
+
prefix
+
"_key"
)
query
=
{
query
=
{
'path'
:
prefix
,
Table
.
id
:
id
,
'file'
:
filename
.
pop
(
prefix
),
Table
.
serv
:
request
.
values
[
'serv'
],
'serv'
:
request
.
values
[
'serv'
],
Table
.
dest
:
request
.
values
[
'dest'
],
'dest'
:
request
.
values
[
'dest'
],
Table
.
type
:
request
.
values
[
'type'
]
'type'
:
request
.
values
[
'type'
]
}
}
id
=
process
.
store
(
query
)
state
=
process
.
param
store
(
query
)
return
st
r
(
id
)
return
st
ate
@app.route
(
'/cons'
,
methods
=
[
'POST'
])
@app.route
(
'/cons'
,
methods
=
[
'POST'
])
def
cons
():
def
cons
():
process
=
Process
(
'messages.db'
)
id_query
=
request
.
form
[
'id'
]
id_query
=
request
.
form
[
'id'
]
row
=
process
.
lookup
(
id_query
)
row
=
process
.
lookup
(
id_query
)
if
type
(
row
)
==
str
:
# error message
if
type
(
row
)
==
str
:
# error message
return
row
return
row
if
row
[
Table
.
state
]
==
States
.
delivered
:
if
row
[
Table
.
state
]
==
States
.
delivered
:
os
.
system
(
"rm "
+
row
[
Table
.
path
])
os
.
system
(
"rm
-r
"
+
row
[
Table
.
path
])
return
str
(
row
[
Table
.
state
])
return
str
(
row
[
Table
.
state
])
def
attempt
():
def
attempt
():
p
rocess
=
Process
(
'messages.db'
)
p
=
Process
(
'messages.db'
)
p
rocess
.
send
()
p
.
send
()
threading
.
Timer
(
retry_timer
,
attempt
)
.
start
()
threading
.
Timer
(
retry_timer
,
attempt
)
.
start
()
def
clean
():
def
clean
():
p
rocess
=
Process
(
'messages.db'
)
p
=
Process
(
'messages.db'
)
paths
=
p
rocess
.
paths
()
paths
=
p
.
paths
()
now
=
datetime
.
datetime
.
now
()
now
=
datetime
.
datetime
.
now
()
# in database (after /msg)
for
folder
in
paths
:
for
file
in
paths
:
mtime
=
os
.
path
.
getmtime
(
folder
)
mtime
=
os
.
path
.
getmtime
(
file
)
# if the folder exists for more than a X seconds, erase it and its contents
# if the file exists for more than a 23 hs, erase it
if
int
(
now
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
))
-
int
(
time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
))
>
operation_timer
:
if
int
(
now
.
strftime
(
"
%
Y
%
m
%
d
%
H"
))
-
int
(
time
.
strftime
(
"
%
Y
%
m
%
d
%
H"
))
>
23
:
os
.
system
(
"rm -r "
+
folder
)
os
.
system
(
"rm "
+
file
)
# in prefixes dictionary (after /data)
for
file
in
filename
:
# not encrypted
if
os
.
path
.
exists
(
file
):
mtime
=
os
.
path
.
getmtime
(
file
)
# if the file exists for more than a 23 hs, erase it
if
int
(
now
.
strftime
(
"
%
Y
%
m
%
d
%
H"
))
-
int
(
time
.
strftime
(
"
%
Y
%
m
%
d
%
H"
))
>
23
:
os
.
system
(
"rm "
+
file
)
filename
.
pop
(
file
)
# encrypted
elif
os
.
path
.
exists
(
file
+
".enc"
):
mtime
=
os
.
path
.
getmtime
(
file
+
".enc"
)
# if the file exists for more than a 23 hs, erase it
if
int
(
now
.
strftime
(
"
%
Y
%
m
%
d
%
H"
))
-
int
(
time
.
strftime
(
"
%
Y
%
m
%
d
%
H"
))
>
23
:
os
.
system
(
"rm "
+
file
+
".enc"
)
os
.
system
(
"rm "
+
file
+
"_key"
)
filename
.
pop
(
file
)
threading
.
Timer
(
clean_timer
,
clean
)
.
start
()
threading
.
Timer
(
clean_timer
,
clean
)
.
start
()
...
@@ -140,4 +123,8 @@ if __name__ == "__main__":
...
@@ -140,4 +123,8 @@ if __name__ == "__main__":
attempt
()
attempt
()
# starts cleaning daemon
# starts cleaning daemon
clean
()
clean
()
# remove and recreate msg folder for cleanup purposes
if
os
.
path
.
exists
(
msgfolder
):
os
.
system
(
"rm -r "
+
msgfolder
)
os
.
mkdir
(
msgfolder
)
app
.
run
(
"0.0.0.0"
)
app
.
run
(
"0.0.0.0"
)
\ No newline at end of file
\ No newline at end of file
enums.py
View file @
68d15a8
class
Table
:
class
Table
:
id
=
"id"
id
=
"id"
path
=
"path"
path
=
"path"
file
=
"file"
serv
=
"serv"
serv
=
"serv"
dest
=
"dest"
dest
=
"dest"
type
=
"type"
type
=
"type"
state
=
"state"
state
=
"state"
@staticmethod
def
validate
(
column
):
return
hasattr
(
Table
,
column
)
class
States
:
class
States
:
queued
=
"queued"
queued
=
"queued"
delivered
=
"delivered"
delivered
=
"delivered"
preprocess
=
"preprocess"
@staticmethod
def
validate
(
state
):
return
hasattr
(
States
,
state
)
class
Services
:
class
Services
:
wpp1
=
"wpp1"
wpp1
=
"wpp1"
sms
=
"sms"
@staticmethod
@staticmethod
def
validate
(
serv
):
def
validate
(
serv
):
if
serv
==
Services
.
wpp1
:
return
hasattr
(
Services
,
serv
)
return
True
return
False
class
Datatypes
:
class
Datatypes
:
text
=
"text"
text
=
"text"
image
=
"image"
image
=
"image"
document
=
"document"
document
=
"document"
link
=
"link"
link
=
"link"
audio
=
"audio"
@staticmethod
@staticmethod
def
validate
(
datatype
):
def
validate
(
datatype
):
if
datatype
==
Datatypes
.
text
:
return
hasattr
(
Datatypes
,
datatype
)
return
True
if
datatype
==
Datatypes
.
image
:
return
True
if
datatype
==
Datatypes
.
document
:
return
True
if
datatype
==
Datatypes
.
link
:
return
True
return
False
process.py
View file @
68d15a8
import
ipdb
import
ipdb
import
os
import
os
import
json
from
services
import
serviceFactory
from
services
import
serviceFactory
from
database
import
DBconnection
from
database
import
DBconnection
from
enums
import
Services
,
States
,
Datatypes
,
Table
from
enums
import
Services
,
States
,
Datatypes
,
Table
...
@@ -10,30 +11,49 @@ class Process:
...
@@ -10,30 +11,49 @@ class Process:
self
.
db
=
db
self
.
db
=
db
self
.
conn
=
DBconnection
(
db
)
self
.
conn
=
DBconnection
(
db
)
# stores the message and returns its id
# stores the data
def
store
(
self
,
query
):
def
datastore
(
self
,
path
):
entities
=
{
Table
.
path
:
path
,
Table
.
state
:
States
.
preprocess
}
id
=
self
.
conn
.
insert
(
"msg"
,
entities
)
return
id
# stores the parameters
def
paramstore
(
self
,
query
):
# service is wrong
# service is wrong
if
not
Services
.
validate
(
query
[
Table
.
serv
]):
if
not
Services
.
validate
(
query
[
Table
.
serv
]):
return
"No existe el servicio '"
+
query
[
Table
.
serv
]
+
"'"
return
"No existe el servicio '"
+
query
[
Table
.
serv
]
+
"'"
ipdb
.
set_trace
()
types
=
json
.
loads
(
query
[
Table
.
type
])
filelist
=
os
.
listdir
(
self
.
lookup
(
query
[
Table
.
id
]
)[
Table
.
path
]
)
for
file
in
types
:
# files don't exist
if
file
not
in
filelist
:
return
"El archivo '"
+
file
"' no existe"
# message can't be sent by this service
# message can't be sent by this service
if
not
serviceFactory
(
query
[
Table
.
serv
])
.
validate
(
query
[
Table
.
type
]):
elif
not
serviceFactory
(
query
[
Table
.
serv
])
.
validate
(
types
[
file
]):
return
"El servicio '"
+
query
[
Table
.
serv
]
+
"' no puede enviar el tipo '"
+
query
[
Table
.
type
]
+
"'"
return
"El servicio '"
+
query
[
Table
.
serv
]
+
"' no puede enviar el tipo '"
+
types
[
file
]
+
"' destinado al archivo '"
file
"'"
entities
=
{
entities
=
{
Table
.
path
:
query
[
Table
.
path
],
Table
.
file
:
query
[
Table
.
file
],
Table
.
dest
:
query
[
Table
.
dest
],
Table
.
dest
:
query
[
Table
.
dest
],
Table
.
serv
:
query
[
Table
.
serv
],
Table
.
serv
:
query
[
Table
.
serv
],
Table
.
type
:
query
[
Table
.
type
],
Table
.
type
:
query
[
Table
.
type
],
Table
.
state
:
States
.
queued
Table
.
state
:
States
.
queued
}
}
id
=
self
.
conn
.
insert
(
"msg"
,
entities
)
self
.
conn
.
update
(
"msg"
,(
Table
.
id
,
query
[
Table
.
id
])
,
entities
)
return
i
d
return
States
.
queue
d
# tries to send all messages available
# tries to send all messages available
def
send
(
self
):
def
send
(
self
):
rows
=
self
.
conn
.
query
(
"SELECT * FROM msg WHERE state = ?"
,(
States
.
queued
,))
rows
=
self
.
conn
.
query
(
"SELECT * FROM msg WHERE state = ?"
,(
States
.
queued
,))
for
query
in
DBconnection
.
parseToTable
(
rows
):
for
query
in
DBconnection
.
parseToTable
(
rows
):
# if f
ile doesn't exist, erase the message request, it can't be read anyway
# if f
older doesn't exist, erase the message request
if
not
os
.
path
.
exists
(
query
[
Table
.
path
]):
if
not
os
.
path
.
exists
(
query
[
Table
.
path
]):
self
.
conn
.
query
(
"DELETE FROM msg WHERE id = ?"
,(
query
[
Table
.
id
],))
self
.
conn
.
query
(
"DELETE FROM msg WHERE id = ?"
,(
query
[
Table
.
id
],))
continue
continue
...
@@ -55,7 +75,6 @@ class Process:
...
@@ -55,7 +75,6 @@ class Process:
self
.
conn
.
query
(
"DELETE FROM msg WHERE id = ?"
,(
id
,))
self
.
conn
.
query
(
"DELETE FROM msg WHERE id = ?"
,(
id
,))
entities
=
{
entities
=
{
Table
.
path
:
row
[
Table
.
path
],
Table
.
path
:
row
[
Table
.
path
],
Table
.
file
:
row
[
Table
.
file
],
Table
.
serv
:
row
[
Table
.
serv
],
Table
.
serv
:
row
[
Table
.
serv
],
Table
.
dest
:
row
[
Table
.
dest
],
Table
.
dest
:
row
[
Table
.
dest
],
Table
.
type
:
row
[
Table
.
type
],
Table
.
type
:
row
[
Table
.
type
],
...
...
services.py
View file @
68d15a8
...
@@ -3,6 +3,11 @@ import json
...
@@ -3,6 +3,11 @@ import json
import
os
import
os
from
enums
import
Table
,
Services
,
States
,
Datatypes
from
enums
import
Table
,
Services
,
States
,
Datatypes
from
abc
import
ABC
,
abstractmethod
from
abc
import
ABC
,
abstractmethod
import
smtplib
from
email.mime.multipart
import
MIMEMultipart
from
email.mime.text
import
MIMEText
from
email.mime.image
import
MIMEImage
from
email.mime.audio
import
MIMEAudio
class
ServiceBase
(
ABC
):
class
ServiceBase
(
ABC
):
...
@@ -28,24 +33,71 @@ class Wpp1(ServiceBase):
...
@@ -28,24 +33,71 @@ class Wpp1(ServiceBase):
server
=
"https://archivos.hgtsa.com.ar/"
server
=
"https://archivos.hgtsa.com.ar/"
def
send
(
self
,
data
):
def
send
(
self
,
data
):
if
data
[
Table
.
type
]
==
Datatypes
.
text
:
types
=
json
.
loads
(
data
[
Table
.
type
])
f
=
open
(
data
[
Table
.
path
])
for
file
in
types
:
filepath
=
data
[
Table
.
path
]
+
file
if
types
[
file
]
==
Datatypes
.
text
:
f
=
open
(
filepath
)
text
=
f
.
read
()
text
=
f
.
read
()
f
.
close
()
f
.
close
()
result
=
requests
.
get
(
url
=
Wpp1
.
URL
+
Wpp1
.
URLmode
[
data
[
Table
.
typ
e
]],
params
=
{
'token'
:
Wpp1
.
token
,
'uid'
:
Wpp1
.
uid
,
'to'
:
data
[
Table
.
dest
],
'text'
:
text
})
result
=
requests
.
get
(
url
=
Wpp1
.
URL
+
Wpp1
.
URLmode
[
types
[
fil
e
]],
params
=
{
'token'
:
Wpp1
.
token
,
'uid'
:
Wpp1
.
uid
,
'to'
:
data
[
Table
.
dest
],
'text'
:
text
})
return
result
.
json
()[
'success'
]
return
result
.
json
()[
'success'
]
else
:
else
:
path
=
requests
.
post
(
url
=
Wpp1
.
server
,
files
=
{
'data'
:
(
str
(
data
[
Table
.
file
]),
open
(
data
[
Table
.
path
]
,
'rb'
))
})
path
=
requests
.
post
(
url
=
Wpp1
.
server
,
files
=
{
'data'
:
(
file
,
open
(
filepath
,
'rb'
))
})
result
=
requests
.
get
(
url
=
Wpp1
.
URL
+
Wpp1
.
URLmode
[
data
[
Table
.
type
]],
params
=
{
'token'
:
Wpp1
.
token
,
'uid'
:
Wpp1
.
uid
,
'to'
:
data
[
Table
.
dest
],
'url'
:
Wpp1
.
server
+
path
.
text
})
result
=
requests
.
get
(
url
=
Wpp1
.
URL
+
Wpp1
.
URLmode
[
data
[
Table
.
type
]],
params
=
{
'token'
:
Wpp1
.
token
,
'uid'
:
Wpp1
.
uid
,
'to'
:
data
[
Table
.
dest
],
'url'
:
Wpp1
.
server
+
path
.
text
})
return
result
.
json
()[
'success'
]
return
result
.
json
()[
'success'
]
def
validate
(
self
,
datatype
):
def
validate
(
self
,
datatype
):
return
Datatypes
.
validate
(
datatype
)
return
datatype
==
Datatypes
.
text
or
datatype
==
Datatypes
.
image
or
datatype
==
Datatypes
.
document
or
datatype
==
Datatypes
.
link
class
SMS
(
ServiceBase
):
def
__init__
(
self
):
self
.
__username
=
"prueba@anacsoft.com"
self
.
__password
=
"prueba2019"
self
.
s
=
smtplib
.
SMTP
(
host
=
"mail.anacsoft.com"
,
port
=
26
)
self
.
s
.
starttls
()
self
.
s
.
login
(
self
.
__username
,
self
.
__password
)
def
send
(
self
,
data
):
msg
=
MIMEMultipart
()
msg
[
'From'
]
=
self
.
_SMS__username
msg
[
'To'
]
=
data
[
Table
.
dest
]
msg
[
'Subject'
]
=
"Test"
types
=
json
.
loads
(
data
[
Table
.
type
])
for
file
in
types
:
filepath
=
data
[
Table
.
path
]
+
file
msg
.
attach
(
self
.
MIMEmode
(
filepath
,
types
[
file
]))
self
.
s
.
send_message
(
msg
)
return
True
def
MIMEmode
(
self
,
path
,
type
):
data
=
open
(
path
,
'rb'
)
mode
=
None
if
type
==
Datatypes
.
text
:
mode
=
MIMEText
(
data
.
read
(),
'plain'
)
if
type
==
Datatypes
.
image
:
mode
=
MIMEImage
(
data
.
read
())
if
type
==
Datatypes
.
audio
:
mode
=
MIMEAudio
(
data
.
read
())
f
.
close
()
return
mode
def
validate
(
self
,
datatype
):
if
datatype
==
Datatypes
.
text
:
return
True
if
datatype
==
Datatypes
.
image
:
return
True
if
datatype
==
Datatypes
.
audio
:
return
True
return
False
def
serviceFactory
(
serv
):
def
serviceFactory
(
serv
):
if
serv
==
Services
.
wpp1
:
if
serv
==
Services
.
wpp1
:
return
Wpp1
()
return
Wpp1
()
if
serv
==
Services
.
sms
:
return
SMS
()
return
None
return
None
test_flask.py
0 → 100644
View file @
68d15a8
from
flask
import
Flask
,
render_template
from
flask_testing
import
TestCase
import
unittest
class
RootTest
(
TestCase
):
render_templates
=
True
def
create_app
(
self
):
app
=
Flask
(
__name__
)
app
.
config
[
'Testing'
]
=
True
@app.route
(
'/'
)
def
main
():
return
render_template
(
'index.html'
)
return
app
def
test_something
(
self
):
response
=
self
.
client
.
get
(
"/"
)
self
.
assertEqual
(
response
.
data
.
decode
(
'utf-8'
),
"Nothing to see here..."
,
response
)
if
__name__
==
"__main__"
:
unittest
.
main
()
\ No newline at end of file
\ No newline at end of file
Write
Preview
Styling with
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment