Files
softone_zadanie/scraper.py
2023-09-28 17:08:50 +02:00

315 lines
9.9 KiB
Python

import requests
import re
import unicodedata
from bs4 import BeautifulSoup
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor
from pymongo import InsertOne
from app.config import settings
from app.db import connect_db, disconnect_db
# variables for custom data parsing
single_value = ["Obchodné meno:", "Sídlo:", "IČO:", "Deň zápisu:", "Právna forma:"]
value_type_dict = {
"IČO:": "number",
"Spoločníci:": "spolocnici",
"Výška vkladu každého spoločníka:": "vklad"
}
def scrape_orsr():
"""
This is the main function that scrapes data from endpoint defined in config and stores it in mongodb.
"""
print("#########################")
print("Starting ORSR scraper")
# get all links to from the orsr url
print("Downloading changed records..")
url = settings["base_url"]+settings["endpoint"]
proxies = {}
if (pr := settings["http_proxy"]) is not None:
proxies.update({"http": pr})
print(f"Found http proxy: {pr}")
if (pr := settings["https_proxy"]) is not None:
proxies.update({"https": pr})
print(f"Found https proxy: {pr}")
html = requests.get(url, proxies=proxies)
print("All changed records downloaded.")
# use bs4 to parse the page
soup = BeautifulSoup(html.content, "html.parser")
# choice between Aktualny and Uplny
m_type = input("Choose which type of records do you want to download:\n[1] 'Aktuálne'\n[2] 'Úplné' (default)\n")
if m_type == "1":
record_type = "Aktuálny"
print("Record type is 'Aktuálny'")
else:
record_type = "Úplný"
print("Record type is 'Úplný'")
records = soup.find_all("a", string=record_type)
# add base_url to href links
records = [settings["base_url"]+record["href"] for record in records]
print(f"There were {len(records)} records found.")
# distribute the work in #of threads defined in config
parts = [records[i::settings["threads"]] for i in range(settings["threads"])]
print(f"Processing {len(records)} records using {settings['threads']} threads:")
with ThreadPoolExecutor() as t:
for thread_id, part in enumerate(parts):
t.submit(process_records, part, thread_id+1)
print("All records_processed")
print("Closing ORSR Scraper...")
print("#########################")
def process_records(records, thread):
"""
worker for processing records in a thread
:param records: list of urls of records to proceses
:param thread: thread id of processing thread
"""
data = []
# add status bar for processing the records
for i in tqdm(range(len(records)), desc=f"thread {thread}"):
try:
record = process_record(records[i])
data.append(InsertOne(record))
except Exception as e:
print(f"When downloading and parsing record {records[i]} following error occured: {e}")
# store processed records in db
collection = connect_db()
collection.bulk_write(data)
disconnect_db(collection)
def process_record(url):
"""
process one record. Scrape url data and parse them to dictionary
:param url: url of the record
:return dictionary of parameters
"""
proxies = {}
if (pr := settings["http_proxy"]) is not None:
proxies.update({"http": pr})
if (pr := settings["https_proxy"]) is not None:
proxies.update({"https": pr})
html = requests.get(url, proxies=proxies)
soup = BeautifulSoup(html.content, "html.parser")
record = get_record_data(soup)
return record
def get_oddiel(soup):
"""
Helper function to get Oddiel
:param soup: website data
:return: dictionary with value: oddiel
"""
oddiel = soup.find("span", class_="tl", string=re.compile("Oddiel:")).parent.find("span", class_="ra").text.strip()
return {"value": oddiel}
def get_vlozka(soup):
"""
Helper function to get VloŽžka
:param soup: website data
:return: dictionary with value: vlozka
"""
vlozka = soup.find("span", class_="tl", string=re.compile("Vložka")).parent.find("span", class_="ra").text.strip()
return {"value": vlozka}
def get_aktualizaciaUdajov(soup):
"""
Helper function to get the date of "Dátum aktualizácie údajov"
:param soup: website data
:return: dictionary with value: aktualizacia
"""
aktualizacia = soup.find("td", class_="tl", string=re.compile("Dátum aktualizácie")).find_next_sibling("td").text.strip()
return {"value": aktualizacia}
def get_vypisUdajov(soup):
"""
Helper function to get the date of "Dátum výpisu"
:param soup: website data
:return: dictionary with value: vypis
"""
vypis = soup.find("td", class_="tl", string=re.compile("Dátum výpisu")).find_next_sibling("td").text.strip()
return {"value": vypis}
def get_data(data_td, value_type="text", allow_multiple_active=True):
"""
Generic function to retrieve data for one key
:param data_td: <td>-element containing the data
:param value_type: type of value that we want to retrieve. Default value is "text" other values are defined in value_type_dict
:param allow_multiple_active: if multiple active values are allowed, then a list of active values is returned, instead of single items
:return: dictionary of data for the entry
"""
data_td = data_td
data = {}
# lists holding the data for one key in the record
values = []
old_values = []
# get multiple entries (as table data)
for entry in data_td.find_all("table"):
value, valid_from, valid_until, active = process_entry(entry, value_type)
if value is None:
continue
if active:
values.append({"value": value, "valid_from": valid_from, "valid_until": valid_until})
else:
old_values.append({"value": value, "valid_from": valid_from, "valid_until": valid_until})
if not allow_multiple_active:
if len(values) > 0:
data.update(values[0])
else:
data.update({"values": values})
data.update({"old_values": old_values})
return data
def get_record_data(soup):
"""
Retrieve data for one record
:param soup: souped-html for the record
:return: dictionary with record data
"""
record = {
"oddiel": get_oddiel(soup),
"vlozka": get_vlozka(soup)
}
# find the last table before variable data
entry = soup.find("span", class_="tl", string=re.compile("Oddiel:")).parent.parent.parent
# retrieve all keys for a record. Since there are multiple different record types with different keys,
# the keys of the record are created automatically from available data
while True:
entry = entry.find_next_sibling("table")
entry_tr = entry.find_all("tr")
entry_tr = [i for i in entry_tr if i.parent == entry_tr[0].parent]
if len(entry_tr) > 1: # last table with "Dátum aktualizácie údajov
break
# get key name and key data
key_container = entry_tr[0].find_all("td")
key_name = key_container[0].text.strip()
# check if multiple active allowed and the value_type
allow_multiple_active = True
value_type = "text"
if key_name in single_value:
allow_multiple_active = False
if (v_type := value_type_dict.get(key_name)) is not None:
value_type = v_type
key_name = transform_key_name(key_name)
# reads the data of the key
key_data = get_data(key_container[1], value_type=value_type, allow_multiple_active=allow_multiple_active)
record.update({key_name: key_data})
record.update({
"aktualizaciaUdajov": get_aktualizaciaUdajov(soup),
"vypisUdajov": get_vypisUdajov(soup)
})
return record
def transform_key_name(name):
"""
Helper function to create camelCase key name
:param name: string with input data (from ORSR)
:return: camelCase key name
"""
s = unicodedata.normalize("NFKD",name).encode('ascii', 'ignore').decode().replace(":", "").lower().split()
return s[0].lower() + "".join(w.capitalize() for w in s[1:])
def process_entry(entry, value_type):
"""
extracts one entry from the table of entries for a given data
:param entry: one table element of data
:param value_type: type of the value data
:return: tuple: (value, valid_from, valid_until, active)
"""
value, valid_from, valid_until, active = None, None, None, False
value_td, valid_td = entry.find_all("td")
# Check if active entry
if value_td.span.attrs["class"][0] == "ra":
active = True
# get clean lines from multiline entries
lines = [f.strip() for f in " ".join(["\n" if x.name == "br" else x.text.strip() for x in value_td.find_all(["br","span"])]).split("\n") if f]
# parse data according to value_type
if value_type == "text":
value = ", ".join(lines)
elif value_type == "number":
value = int("".join(lines).replace(" ",""))
elif value_type == "spolocnici":
spolocnik = lines[0]
adresa = ", ".join(lines[1:])
value = {
"spolocnik": spolocnik,
"adresa": adresa
}
elif value_type == "vklad":
spolocnik = lines[0]
vklad = ", ".join(lines[1:])
value = {
"spolocnik": spolocnik,
"vklad": vklad
}
valid_from, valid_until = parse_oddo(valid_td.text.strip())
return value, valid_from, valid_until, active
def parse_oddo(text):
"""
Parses the valid_from and valid_until from string
:param text: od_do_dates in format: "(od: DD.MM.YYYY do: DD.MM.YYYY)"
:return: returns a tuple (valid_from, valid_until)
"""
valid_from, valid_until = "", ""
if (start_from := text.find("od: ")) > -1:
valid_from = text[start_from+4:start_from+14]
if (start_until := text.find("do: ")) > -1:
valid_until = text[start_until+4:start_until+14]
return valid_from, valid_until
if __name__ == "__main__":
scrape_orsr()