osci/crawlers/github/rest.py (99 lines of code) (raw):
"""Copyright since 2019, EPAM Systems
This file is part of OSCI.
OSCI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OSCI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OSCI. If not, see <http://www.gnu.org/licenses/>."""
from typing import NamedTuple, Optional, List, Any, Dict
import datetime
import requests
import logging
import time
from requests.structures import CaseInsensitiveDict
log = logging.getLogger(__name__)
class GithubArchiveRest(requests.Session):
BASE_URL = 'https://data.gharchive.org/'
@staticmethod
def _get_hour_file_name(date: datetime.datetime) -> str:
return f'{date.strftime("%Y-%m-%d")}-{date.hour}.json.gz'
def _get_hour_url(self, file_name: str) -> str:
return self.BASE_URL + file_name
def get_hourly_events(self, date: datetime.datetime) -> bytes:
log.info(f'Load events for date: {date}')
file_name = self._get_hour_file_name(date=date)
response = self.get(self._get_hour_url(file_name=file_name))
if response.status_code == 200:
return response.content
class RequestsLimit(NamedTuple):
requests_limit: Optional[int]
requests_remaining: Optional[int]
limit_reset_time: Optional[datetime.datetime]
class GithubRest(requests.Session):
"""Github Rest Implementation"""
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
DELETE = 'DELETE'
base_url = 'https://api.github.com'
repos_url = base_url + '/repos'
def __init__(self, token: str, wait_til_limits: bool = True):
"""Github api rest session constructor
:param token: github api access token
:param wait_til_limits: wait reset limits time and retry request
"""
super().__init__()
self.token = token
self.limits = RequestsLimit(None, None, None)
self.wait_til_limits = wait_til_limits
def request(self, method, url, **kwargs) -> requests.Response:
"""Override request method to patch requests headers and parse response headers"""
resp = self.__make_request(method=method, url=url, **kwargs)
if self.wait_til_limits and resp.status_code == 403 and self.limits.limit_reset_time is not None:
wait: datetime.timedelta = self.limits.limit_reset_time - datetime.datetime.now()
log.warning(f'{method} response [{resp.status_code}]'
f'remaining_retries={self.limits.requests_remaining} '
f'Wait til {self.limits.limit_reset_time} ({wait})'
f'url=`{url}`')
time.sleep(wait.total_seconds())
log.debug(f"Retry making request to Github API method={method}, url={url}, kwargs={kwargs} "
f"after reset limits")
resp = self.__make_request(method=method, url=url, **kwargs)
return resp
def __make_request(self, method, url, **kwargs) -> requests.Response:
log.debug(f"Make request to Github API method={method}, url={url}, kwargs={kwargs}")
kwargs['headers'] = {**kwargs.get('headers', {}), **{'Authorization': f'token {self.token}'}}
resp = super().request(method=method, url=url, **kwargs)
self.limits = self.__get_limits(resp.headers)
log.debug(f"Get response[{resp.status_code}] from Github API method={method}, url={url}, kwargs={kwargs}")
return resp
def _get_repo_url(self, repo_name):
"""Generates repository url"""
return f'{self.repos_url}/{repo_name}'
def _get_repo_events_url(self, repo_name):
"""Generates repository events url"""
return f'{self._get_repo_url(repo_name)}/events'
@staticmethod
def __get_limits(headers) -> RequestsLimit:
"""Parse limits from response header"""
def __parse_int(value: str) -> Optional[int]:
"""Parse int if not None"""
try:
return int(value)
except Exception as ex:
log.warning(f'Error parse `{value}` to int. Exception: {ex}')
return
def __parse_datetime(value: str) -> Optional[datetime.datetime]:
"""Parse datetime if not None"""
value = __parse_int(value)
if value is None:
return
try:
return datetime.datetime.fromtimestamp(value)
except Exception as ex:
log.warning(f'Error parse `{value}` to datetime. Exception: {ex}')
return
return RequestsLimit(requests_limit=__parse_int(headers.get('X-RateLimit-Limit')),
requests_remaining=__parse_int(headers.get('X-RateLimit-Remaining')),
limit_reset_time=__parse_datetime(headers.get('X-RateLimit-Reset')))
def get_repository_events(self, repo_name: str) -> List[dict]:
"""Get events in repository from API
:param repo_name: repository name
:return:
"""
log.info(f'Get events in repository {repo_name}')
resp = self.request(method=self.GET, url=self._get_repo_events_url(repo_name=repo_name))
try:
if resp.status_code == 200:
return resp.json()
except Exception as ex:
log.warning(f'Exception on parse events in repository {repo_name}: {ex}')
log.warning(f'Empty response on events in repository {repo_name}')
return []
def get_repository(self, repo_name: str) -> Optional[Dict[str, Any]]:
"""Get repository information from API
:param repo_name: repository name
:return:
"""
log.info(f'Get repository {repo_name} information')
resp = self.request(method=self.GET, url=self._get_repo_url(repo_name=repo_name))
try:
if resp.status_code == 200:
return resp.json()
except Exception as ex:
log.warning(f'Exception on parse response on getting repository {repo_name}: {ex}')
log.warning(f'Empty response on getting repository {repo_name}')