import csv import json import logging import os from typing import List from google.cloud import bigquery from google.oauth2 import service_account import google.api_core.exceptions from gatherers.gathererabc import Gatherer from utils import utils # Options: # # --timeout: Override the 10 minute job timeout (specify in seconds). # --cache: Use locally cached export data instead of hitting BigQuery. # Gathers hostnames from Censys.io via the Google BigQuery API. # # Before using this, you need to: # # * create a Project in Google Cloud, and an associated service account # with access to create new jobs/queries and get their results. # * give Censys.io this Google Cloud service account to grant access to. # # For details on concepts, and how to test access in the web console: # # * https://support.censys.io/google-bigquery/bigquery-introduction # * https://support.censys.io/google-bigquery/adding-censys-datasets-to-bigquery # # Note that the web console access is based on access given to a Google account, # but BigQuery API access via this script depends on access given to # Google Cloud *service account* credentials. # Defaults to 10 minute timeout. default_timeout = 60 * 60 * 10 class Gatherer(Gatherer): def gather(self): # Returns a parsed, processed Google service credentials object. credentials = load_credentials() if credentials is None: logging.warning("No BigQuery credentials provided.") logging.warning("Set BIGQUERY_CREDENTIALS or BIGQUERY_CREDENTIALS_PATH environment variables.") exit(1) # When using this form of instantiation, the client won't pull # the project_id out of the creds, has to be set explicitly. client = bigquery.Client( project=credentials.project_id, credentials=credentials ) # Allow override of default timeout (in seconds). timeout = int(self.options.get("timeout", default_timeout)) # Construct the query. query = query_for(self.suffixes) logging.debug("Censys query:\n%s\n" % query) # Plan to store in cache/censys/export.csv. download_path = utils.cache_path( "export", "censys", ext="csv", cache_dir=self.cache_dir) # Reuse of cached data can be turned on with --cache. cache = self.options.get("cache", False) if (cache is True) and os.path.exists(download_path): logging.warning("Using cached download data.") # But by default, fetch new data from the BigQuery API, # and write it to the expected download location. else: # Ensure cache destination exists. utils.mkdir_p(os.path.dirname(download_path)) logging.warning("Kicking off SQL query job.") rows = None # Actually execute the query. try: # Executes query and loads all results into memory. query_job = client.query(query) iterator = query_job.result(timeout=timeout) rows = list(iterator) except google.api_core.exceptions.Forbidden: logging.warning("Access denied to Censys' BigQuery tables.") except: logging.warning(utils.format_last_exception()) logging.warning("Error talking to BigQuery, aborting.") # At this point, the job is complete and we need to download # the resulting CSV URL in results_url. logging.warning("Caching results of SQL query.") download_file = open(download_path, 'w', newline='') download_writer = csv.writer(download_file) download_writer.writerow(["Domain"]) # will be skipped on read # Parse the rows and write them out as they were returned (dupes # and all), to be de-duped by the central gathering script. for row in rows: domains = row['common_name'] + row['dns_names'] for domain in domains: download_writer.writerow([domain]) # End CSV writing. download_file.close() # Whether we downloaded it fresh or not, read from the cached data. for domain in utils.load_domains(download_path): if domain: yield domain # Constructs the query to run in BigQuery, against Censys' # certificate datasets, for one or more suffixes. # # Example query: # # SELECT # parsed.subject.common_name, # parsed.extensions.subject_alt_name.dns_names # FROM # `censys-io.certificates_public.certificates`, # UNNEST(parsed.subject.common_name) AS common_names, # UNNEST(parsed.extensions.subject_alt_name.dns_names) AS sans # WHERE # (common_names LIKE "%.gov" # OR sans LIKE "%.gov") # OR (common_names LIKE "%.fed.us" # OR sans LIKE "%.fed.us"); def query_for(suffixes: List[str]) -> str: select = "\n".join([ " parsed.subject.common_name,", " parsed.extensions.subject_alt_name.dns_names", ]) from_clause = "\n".join([ " `censys-io.certificates_public.certificates`,", " UNNEST(parsed.subject.common_name) AS common_names,", " UNNEST(parsed.extensions.subject_alt_name.dns_names) AS sans", ]) # Returns query fragment for a specific suffix. def suffix_query(suffix): return "\n".join([ "(common_names LIKE \"%%%s\"" % suffix, " OR sans LIKE \"%%%s\")" % suffix, ]) # Join the individual suffix clauses into one WHERE clause. where = str.join("\n OR ", [suffix_query(suffix) for suffix in suffixes]) query = "\n".join([ "SELECT", select, "FROM", from_clause, "WHERE", " %s" % where ]) return query def get_credentials_from_env_var_or_file(env_var: str="", env_file_var: str="") -> str: creds = os.environ.get(env_var, None) if creds is None: path = os.environ.get(env_file_var, None) if path is not None: with open(path) as f: creds = f.read() return creds # Load BigQuery credentials from either a JSON string, or # a JSON file. Passed in via environment variables either way. def load_credentials(): creds = get_credentials_from_env_var_or_file( env_var="BIGQUERY_CREDENTIALS", env_file_var="BIGQUERY_CREDENTIALS_PATH") if creds is None: return None parsed = json.loads(creds) return service_account.Credentials.from_service_account_info(parsed)