main.py 9.7 KB
#-*- coding: utf-8 -*-
from flask import Flask, render_template, request, url_for, make_response, redirect
from iselenium import SeleniumInterface as SI
import random, json, os, datetime, ingenieros,apollo, fosadiaria

ing = ingenieros.ingenieros()
FozaDiaria = fosadiaria.FosaDiaria()

app = Flask(__name__)

def create():
	if not os.path.exists("data/users.json"):
		with open("data/users.json", "+w") as f:
			f.write("{}")

def usersave(usr, psw):
	create()
	content = ""
	with open("data/users.json", "r") as u:
		content = u.read()
	
	with open("data/users.json", "w") as u:
		try:
			content = json.loads(content)
		except:
			content = {}

		content[usr] = psw
		u.write( json.dumps(content) )	

def userget(usr):
	create()
	content = ""
	with open("data/users.json", "r") as u:
		content = u.read()
	
	try:
		return json.loads(content)[usr]
	except:
		return None

@app.route('/')
def main():
	if "login" not in request.cookies:
		return redirect(url_for("login"))

	return render_template(
		"form.html",
		bypass = False
	)

@app.route('/login', methods = ['GET', 'POST'])
def login():
	if request.method == "GET":
		return render_template(
			"login.html"
		)
	else:
		try:
			s = SI(SI.Chrome)
			r = make_response(
				json.dumps(
					login(
						request.json['usuario'],
						request.json['contrasena'],
						s
					)
				)
			)
			r.set_cookie(
				"login",
				request.json['usuario'],
				60 * 60 * 8,
				httponly = True
			)
			usersave(
				request.json['usuario'],
				request.json['contrasena']
			)
			s.driver.quit()
			return r
		except Exception as E:
			#s.driver.quit()
			TratarCerrarNabegador(s)
			return f"Error en el login {str(E)}"
		
def TratarCerrarNabegador(s):
	try:
		s.driver.quit()
	except:
		pass

@app.route('/manual')
def manual():
	if "login" not in request.cookies:
		return redirect(url_for("login"))

	return render_template(
		"form.html",
		bypass = True
	)

@app.route('/anomalies', methods = ['POST'])
def anomalies():
	if "login" not in request.cookies:
		return redirect(url_for("login"))
	
	d = request.json
	plate = d['header']['patente']
	s = SI(SI.Chrome)
	anom = {
		"header" : {
			"patente" : plate
		}
	}

	try:
		login(
			request.cookies["login"],
			userget(
				request.cookies["login"]
			),
			s
		)
	except:
		s.driver.quit()
		return f"Error en el login"
	fingfd = FozaDiaria.BuscarDatosDominio(plate.upper(),selenium=s)
	try:
		gototec(s, d)
	except:
		s.driver.quit()
		return f"Error yendo a las especificaciones técnicas del dominio '{plate}'."
	try:
		anom = fetchAnomalies(s, anom)
	except:
		s.driver.quit()
		return f"Fallo en la recolección de anomalías del dominio '{plate}'."
	try:
		anom = gotoadmin(s, anom)
	except:
		return f"Error yendo a los datos administrativos del dominio '{plate}'."
	
	s.driver.quit()
	anom['header']['patente'] = plate
	anom['header']['fecha'] = datetime.datetime.now().strftime("%d/%m/%Y")
	anom['header']["hora"] = str(fingfd)
	return render_template("anomalies.html", anomalies = anom)

@app.route('/report', methods = ['POST'])
def report():
	if "login" not in request.cookies:
		return redirect(url_for("login"))

	d = request.json
	plate = d['header']['patente']
	s = SI(SI.Chrome)

	answer = {
		"header" : {},
		"alineador" : {},
		"suspension" : d['suspension'],
		"frenos" : {},
		"trasero" : {},
		"gaseshumos" : {},
	}

	answer['header']['patente'] = plate.upper()
	answer['header']["fecha-hora"] = apollo.estaticos.FechaHora()
	#print(1,answer)
	answer['header']["ingeniero"] = ing.LeerDatosUsuario(request.cookies["login"])['nombreyapellido']
	#answer['header']["hora"] = apollo.estaticos.Hora()
	#print(2,answer)

	try:
		login(
			request.cookies["login"],
			userget(
				request.cookies["login"]
			),
			s
		)
	except:
		s.driver.quit()
		return f"Error en el login"

	fingfd = FozaDiaria.BuscarDatosDominio(plate.upper(),selenium=s)
	#print(3,answer)


	try:
		answer = gototec(s, answer)
	except Exception as E:
		s.driver.quit()
		return f"Error yendo a las especificaciones técnicas del dominio '{plate}'. {str(E)}"
	try:
		answer = readdata(s, answer)
	except:
		s.driver.quit()
		return f"Error leyendo datos de la patente '{plate}'."
	try:
		answer = rnddata(answer)
	except:
		s.driver.quit()
		return f"Error completando datos extra de la patente '{plate}'."
	#print(answer)

	answer['header']["hora"] = str(fingfd)
	s.driver.quit()
	return json.dumps(answer)

def login(u, p, s):
	s.get("https://rto.cent.gov.ar/rto")

	login = s.find(SI.By.NAME, "j_username")
	s.write(login, u)

	psw = s.find(SI.By.NAME, "j_password")
	s.write(psw, p)

	button = s.find(SI.By.ID, "submit")
	button.click()
	
	login = s.find(SI.By.NAME, "j_username")

	# login succeeded
	if login == None:
		return True

	# still in login page
	else:
		raise Exception("Fallo del login")

def gototec(s, r):
	s.get("https://rto.cent.gov.ar/rto/RTO/listaDePlanillas")

	# children of parent of td with innerText = plate
	found = False
	while(found == False):
		try:
			columns = s.children(s.parent( s.find(SI.By.XPATH, f"//tr//td[text()='{r['header']['patente']}']") ))
			r['header']['fecha'], r['header']['hora'] = s.readElement(columns[4]).split(" ")
			found = True
		except:
			# next page
			s.find(SI.By.XPATH, "//a[text()='Siguiente']").click()
		

	# get all a tags and click the last one
	options = s.findFromElement(columns[-1], SI.By.TAG_NAME, "a", "1-")
	options[-1].click()

	# if last clickable is 'Datos Técnicos', click, else you are already there
	tec = s.find(SI.By.XPATH, "//a/span[@class='externo']/parent::*", "1-")[-1]

	if tec.get_attribute("innerText") == "ir a Datos Técnicos":
		tec.click()
	
	return r

# Assumes already in tec
def gotoadmin(s, r):
	s.find(SI.By.XPATH, "//a/span[@class='externo']/parent::*", "1-")[0].click()

	s.find(SI.By.XPATH, "//a[@href='#titularOperador']").click()

	rsocial = s.find(SI.By.XPATH, "//div[@id='datosOperador']//fieldset[2]/div")
	cp = s.find(SI.By.XPATH, "//div[@id='datosOperador']//fieldset[3]/div[4]")

	r['header']['rsocial'] = rsocial.get_attribute("innerText").split(":")[1].strip()
	r['header']['cp'] = cp.get_attribute("innerText").split(":")[1].strip()

	return r

def readdata(s, r):
	reach = lambda id: lambda s: s.readInput( s.find(s.By.ID, id) )

	# alineacion
	r['alineador']['eje_delantero'] = _e2q(_attempt( reach("deriva"), "?" )(s))

	# suspension
	sus = r['suspension']
	for i in range(2):
		# si valores de rendimiento son numeros, leer el peso
		if sus[f'rendimiento_izquierdo_{i + 1}'].isnumeric() or sus[f'rendimiento_derecho_{i + 1}'].isnumeric():
			sus[f'peso_estatico_{i + 1}'] = _e2q(_attempt( reach(f"pesoBascula-{i}"), "?" )(s))
		else:
			sus[f'rendimiento_izquierdo_{i + 1}'] = "?"
			sus[f'rendimiento_derecho_{i + 1}'] = "?"
			sus[f'peso_estatico_{i + 1}'] = "?"

	r['suspension'].update(sus)

	# frenos
	for i in range(4):
		fre = {}
		fre[f'peso_estatico_{i + 1}'] = _e2q(_attempt( reach(f"pesoBascula-{i}"), "?" )(s))
		fre[f'fuerza_izquierda_{i + 1}'] = _e2q(_attempt( reach(f"fuerzaIzq-{i}"), "?" )(s))
		fre[f'fuerza_derecha_{i + 1}'] = _e2q(_attempt( reach(f"fuerzaDer-{i}"), "?" )(s))

		r['frenos'].update(fre)

	# freno trasero

	r['trasero']['peso_estatico'] = _e2q(_attempt( reach(f"pesoBasculaEst-0"), "?" )(s))
	r['trasero']['fuerza_izquierda'] = _e2q(_attempt( reach(f"fuerzaIzqEst-0"), "?" )(s))
	r['trasero']['fuerza_derecha'] = _e2q(_attempt( reach(f"fuerzaDerEst-0"), "?" )(s))
	r['trasero']['eje'] = _e2q(_attempt( reach(f"nroEjeEst-0"), "?" )(s))

	# gases y humos

	r['gaseshumos']['opacidad_logaritmica'] = _e2q(_attempt( reach(f"opacidadLogaritmica"), "?" )(s))
	r['gaseshumos']['co'] = _e2q(_attempt( reach(f"cantCO"), "?" )(s))
	r['gaseshumos']['hc'] = _e2q(_attempt( reach(f"cantHC"), "?" )(s))

	return r

def rnddata(r):

	res = lambda: round(random.random() * 0.9 + 0.05, 2)
	ov = lambda: round(random.random() * 39 + 0.5, 2)

	for i in range(4):
		f = r['frenos']; j = i+1; gen = False
		# If any values were found, it means the axis exists, random values will be generated.
		if f[f'fuerza_izquierda_{j}'] != "?" or f[f'fuerza_derecha_{j}'] != "?" or f[f'peso_estatico_{j}'] != "?":
			gen = True
		
		f[f'resistencia_izquierda_{j}'] = res() if gen else "?"
		f[f'resistencia_derecha_{j}'] = res() if gen else "?"
		f[f'ovalidad_izquierda_{j}'] = ov() if gen else "?"
		f[f'ovalidad_derecha_{j}'] = ov() if gen else "?"
	
	return r

def fetchAnomalies(s, r):
	def checked(Maybe_checkbox):
		if Maybe_checkbox == None:
			return False
		return Maybe_checkbox.is_selected()

	def severity(row):
		hig = s.findFromElement(row[1], SI.By.XPATH, "input[@type='checkbox']")
		med = s.findFromElement(row[2], SI.By.XPATH, "input[@type='checkbox']")
		
		if checked(hig):
			return "Grave"
		elif checked(med):
			return "Moderada"
		else:
			return "Leve"

	def anomalyType(row):
		return row[4].get_attribute('innerText').split('>')[0].strip()
		
	def description(row):
		textarea = s.findFromElement(row[5], SI.By.TAG_NAME, "textarea")
		return textarea.get_attribute("value")

	result = {}

	# Click Anomalies tab
	s.find(SI.By.ID, "ui-id-2").click()

	# Anomaly table, if one exists
	rows = s.children( s.find(SI.By.XPATH, "//div[@id='tableAnomaliaRevisionGuardadaDiv']/div/div/table/tbody") )
	if rows == None:
		return r
	
	# Complete return data
	for row in rows:
		c = s.children(row)
		t = anomalyType(c)
		if t not in result:
			result[t] = []

		result[ t ].append({
			'severity' : severity(c),
			'description' : description(c)
		})

	r['anomalies'] = result
	return r

# Executes the lambda with the arguments, with try except
def _attempt(f, default = "", error = ""):
	def inner(*args, **kwargs):
		try:
			return f(*args, **kwargs)
		except:
			if error != "":
				raise Exception(error)
			return default
	return inner

def _e2q(string):
	return "?" if string == "" else string

# Inicio del servicio
if __name__ == "__main__":
        app.run("0.0.0.0", port=13000)