import requests import re import unicodedata from bs4 import BeautifulSoup from tqdm.auto import tqdm from concurrent.futures import ThreadPoolExecutor from pymongo import InsertOne from app.config import settings from app.db import connect_db, disconnect_db # variables for custom data parsing single_value = ["Obchodné meno:", "Sídlo:", "IČO:", "Deň zápisu:", "Právna forma:"] value_type_dict = { "IČO:": "number", "Spoločníci:": "spolocnici", "Výška vkladu každého spoločníka:": "vklad" } def scrape_orsr(): """ This is the main function that scrapes data from endpoint defined in config and stores it in mongodb. """ print("#########################") print("Starting ORSR scraper") # get all links to from the orsr url print("Downloading changed records..") url = settings["base_url"]+settings["endpoint"] proxies = {} if (pr := settings["http_proxy"]) is not None: proxies.update({"http": pr}) print(f"Found http proxy: {pr}") if (pr := settings["https_proxy"]) is not None: proxies.update({"https": pr}) print(f"Found https proxy: {pr}") html = requests.get(url, proxies=proxies) print("All changed records downloaded.") # use bs4 to parse the page soup = BeautifulSoup(html.content, "html.parser") # choice between Aktualny and Uplny m_type = input("Choose which type of records do you want to download:\n[1] 'Aktuálne'\n[2] 'Úplné' (default)\n") if m_type == "1": record_type = "Aktuálny" print("Record type is 'Aktuálny'") else: record_type = "Úplný" print("Record type is 'Úplný'") records = soup.find_all("a", string=record_type) # add base_url to href links records = [settings["base_url"]+record["href"] for record in records] print(f"There were {len(records)} records found.") # distribute the work in #of threads defined in config parts = [records[i::settings["threads"]] for i in range(settings["threads"])] print(f"Processing {len(records)} records using {settings['threads']} threads:") with ThreadPoolExecutor() as t: for thread_id, part in enumerate(parts): t.submit(process_records, part, thread_id+1) print("All records_processed") print("Closing ORSR Scraper...") print("#########################") def process_records(records, thread): """ worker for processing records in a thread :param records: list of urls of records to proceses :param thread: thread id of processing thread """ data = [] # add status bar for processing the records for i in tqdm(range(len(records)), desc=f"thread {thread}"): try: record = process_record(records[i]) data.append(InsertOne(record)) except Exception as e: print(f"When downloading and parsing record {records[i]} following error occured: {e}") # store processed records in db collection = connect_db() collection.bulk_write(data) disconnect_db(collection) def process_record(url): """ process one record. Scrape url data and parse them to dictionary :param url: url of the record :return dictionary of parameters """ proxies = {} if (pr := settings["http_proxy"]) is not None: proxies.update({"http": pr}) if (pr := settings["https_proxy"]) is not None: proxies.update({"https": pr}) html = requests.get(url, proxies=proxies) soup = BeautifulSoup(html.content, "html.parser") record = get_record_data(soup) return record def get_oddiel(soup): """ Helper function to get Oddiel :param soup: website data :return: dictionary with value: oddiel """ oddiel = soup.find("span", class_="tl", string=re.compile("Oddiel:")).parent.find("span", class_="ra").text.strip() return {"value": oddiel} def get_vlozka(soup): """ Helper function to get VloŽžka :param soup: website data :return: dictionary with value: vlozka """ vlozka = soup.find("span", class_="tl", string=re.compile("Vložka")).parent.find("span", class_="ra").text.strip() return {"value": vlozka} def get_aktualizaciaUdajov(soup): """ Helper function to get the date of "Dátum aktualizácie údajov" :param soup: website data :return: dictionary with value: aktualizacia """ aktualizacia = soup.find("td", class_="tl", string=re.compile("Dátum aktualizácie")).find_next_sibling("td").text.strip() return {"value": aktualizacia} def get_vypisUdajov(soup): """ Helper function to get the date of "Dátum výpisu" :param soup: website data :return: dictionary with value: vypis """ vypis = soup.find("td", class_="tl", string=re.compile("Dátum výpisu")).find_next_sibling("td").text.strip() return {"value": vypis} def get_data(data_td, value_type="text", allow_multiple_active=True): """ Generic function to retrieve data for one key :param data_td: -element containing the data :param value_type: type of value that we want to retrieve. Default value is "text" other values are defined in value_type_dict :param allow_multiple_active: if multiple active values are allowed, then a list of active values is returned, instead of single items :return: dictionary of data for the entry """ data_td = data_td data = {} # lists holding the data for one key in the record values = [] old_values = [] # get multiple entries (as table data) for entry in data_td.find_all("table"): value, valid_from, valid_until, active = process_entry(entry, value_type) if value is None: continue if active: values.append({"value": value, "valid_from": valid_from, "valid_until": valid_until}) else: old_values.append({"value": value, "valid_from": valid_from, "valid_until": valid_until}) if not allow_multiple_active: if len(values) > 0: data.update(values[0]) else: data.update({"values": values}) data.update({"old_values": old_values}) return data def get_record_data(soup): """ Retrieve data for one record :param soup: souped-html for the record :return: dictionary with record data """ record = { "oddiel": get_oddiel(soup), "vlozka": get_vlozka(soup) } # find the last table before variable data entry = soup.find("span", class_="tl", string=re.compile("Oddiel:")).parent.parent.parent # retrieve all keys for a record. Since there are multiple different record types with different keys, # the keys of the record are created automatically from available data while True: entry = entry.find_next_sibling("table") entry_tr = entry.find_all("tr") entry_tr = [i for i in entry_tr if i.parent == entry_tr[0].parent] if len(entry_tr) > 1: # last table with "Dátum aktualizácie údajov break # get key name and key data key_container = entry_tr[0].find_all("td") key_name = key_container[0].text.strip() # check if multiple active allowed and the value_type allow_multiple_active = True value_type = "text" if key_name in single_value: allow_multiple_active = False if (v_type := value_type_dict.get(key_name)) is not None: value_type = v_type key_name = transform_key_name(key_name) # reads the data of the key key_data = get_data(key_container[1], value_type=value_type, allow_multiple_active=allow_multiple_active) record.update({key_name: key_data}) record.update({ "aktualizaciaUdajov": get_aktualizaciaUdajov(soup), "vypisUdajov": get_vypisUdajov(soup) }) return record def transform_key_name(name): """ Helper function to create camelCase key name :param name: string with input data (from ORSR) :return: camelCase key name """ s = unicodedata.normalize("NFKD",name).encode('ascii', 'ignore').decode().replace(":", "").lower().split() return s[0].lower() + "".join(w.capitalize() for w in s[1:]) def process_entry(entry, value_type): """ extracts one entry from the table of entries for a given data :param entry: one table element of data :param value_type: type of the value data :return: tuple: (value, valid_from, valid_until, active) """ value, valid_from, valid_until, active = None, None, None, False value_td, valid_td = entry.find_all("td") # Check if active entry if value_td.span.attrs["class"][0] == "ra": active = True # get clean lines from multiline entries lines = [f.strip() for f in " ".join(["\n" if x.name == "br" else x.text.strip() for x in value_td.find_all(["br","span"])]).split("\n") if f] # parse data according to value_type if value_type == "text": value = ", ".join(lines) elif value_type == "number": value = int("".join(lines).replace(" ","")) elif value_type == "spolocnici": spolocnik = lines[0] adresa = ", ".join(lines[1:]) value = { "spolocnik": spolocnik, "adresa": adresa } elif value_type == "vklad": spolocnik = lines[0] vklad = ", ".join(lines[1:]) value = { "spolocnik": spolocnik, "vklad": vklad } valid_from, valid_until = parse_oddo(valid_td.text.strip()) return value, valid_from, valid_until, active def parse_oddo(text): """ Parses the valid_from and valid_until from string :param text: od_do_dates in format: "(od: DD.MM.YYYY do: DD.MM.YYYY)" :return: returns a tuple (valid_from, valid_until) """ valid_from, valid_until = "", "" if (start_from := text.find("od: ")) > -1: valid_from = text[start_from+4:start_from+14] if (start_until := text.find("do: ")) > -1: valid_until = text[start_until+4:start_until+14] return valid_from, valid_until if __name__ == "__main__": scrape_orsr()