forestav commited on
Commit
6e54bce
·
0 Parent(s):

first commit

Browse files
Files changed (9) hide show
  1. .gitignore +2 -0
  2. app.py +199 -0
  3. bootstrap.py +28 -0
  4. get_ads.py +32 -0
  5. main.py +46 -0
  6. pinecone_handler.py +192 -0
  7. settings.py +41 -0
  8. time_handling.py +32 -0
  9. timestamp2.txt +1 -0
.gitignore ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ venv
2
+ __pycache__
app.py ADDED
@@ -0,0 +1,199 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import streamlit as st
2
+ import PyPDF2
3
+ import io
4
+ import docx2txt
5
+ from typing import Optional
6
+ import re
7
+ from pinecone_handler import PineconeHandler
8
+ from time_handling import read_timestamp
9
+
10
+ def extract_text_from_pdf(pdf_file) -> str:
11
+ """Extract text content from PDF file"""
12
+ pdf_reader = PyPDF2.PdfReader(pdf_file)
13
+ text = ""
14
+ for page in pdf_reader.pages:
15
+ text += page.extract_text() + "\n"
16
+ return text
17
+
18
+ def extract_text_from_docx(docx_file) -> str:
19
+ """Extract text content from DOCX file"""
20
+ text = docx2txt.process(docx_file)
21
+ return text
22
+
23
+ def extract_resume_text(uploaded_file) -> Optional[str]:
24
+ """Extract text from uploaded resume file"""
25
+ if uploaded_file is None:
26
+ return None
27
+
28
+ # Get the file extension
29
+ file_extension = uploaded_file.name.split('.')[-1].lower()
30
+
31
+ try:
32
+ # Process based on file type
33
+ if file_extension == 'pdf':
34
+ return extract_text_from_pdf(uploaded_file)
35
+ elif file_extension in ['docx', 'doc']:
36
+ return extract_text_from_docx(uploaded_file)
37
+ elif file_extension == 'txt':
38
+ return str(uploaded_file.read(), "utf-8")
39
+ else:
40
+ st.error(f"Unsupported file format: {file_extension}")
41
+ return None
42
+ except Exception as e:
43
+ st.error(f"Error processing file: {str(e)}")
44
+ return None
45
+
46
+ def clean_resume_text(text: str) -> str:
47
+ """Clean and process resume text"""
48
+ if not text:
49
+ return ""
50
+
51
+ # Remove special characters and extra whitespace
52
+ text = re.sub(r'\s+', ' ', text)
53
+ text = text.strip()
54
+
55
+ return text
56
+
57
+ def is_description_truncated(description: str) -> bool:
58
+ """Check if the description appears to be truncated"""
59
+ # Check for obvious truncation indicators
60
+ truncation_indicators = [
61
+ lambda x: len(x) >= 995, # Close to the 1000 char limit
62
+ lambda x: x.rstrip().endswith(('...', '…')),
63
+ lambda x: re.search(r'\w+$', x) and not re.search(r'[.!?]$', x), # Ends mid-word or without punctuation
64
+ ]
65
+
66
+ return any(indicator(description) for indicator in truncation_indicators)
67
+
68
+ def format_job_description(description: str, truncated: bool = False) -> str:
69
+ """Format job description text with proper sections and line breaks"""
70
+ if not description:
71
+ return ""
72
+
73
+ # Common section headers in job descriptions
74
+ sections = [
75
+ "About us", "About you", "About the role", "About the position",
76
+ "Requirements", "Qualifications", "Skills", "Responsibilities",
77
+ "What you'll do", "What we offer", "Benefits", "Your profile",
78
+ "Required skills", "What you need", "Who you are"
79
+ ]
80
+
81
+ # Add line breaks before section headers
82
+ formatted_text = description
83
+ for section in sections:
84
+ # Look for section headers with case-insensitive matching
85
+ pattern = re.compile(f'({section}:?)', re.IGNORECASE)
86
+ formatted_text = pattern.sub(r'\n\n\1', formatted_text)
87
+
88
+ # Handle bullet points (both • and - symbols)
89
+ formatted_text = re.sub(r'[•-]\s*', '\n• ', formatted_text)
90
+
91
+ # Add line breaks for sentences that look like list items
92
+ formatted_text = re.sub(r'(?<=\w)\.(?=\s*[A-Z])', '.\n', formatted_text)
93
+
94
+ # Clean up any excessive line breaks
95
+ formatted_text = re.sub(r'\n{3,}', '\n\n', formatted_text)
96
+
97
+ if truncated:
98
+ formatted_text = formatted_text.rstrip() + "..."
99
+
100
+ return formatted_text.strip()
101
+
102
+
103
+
104
+ def main():
105
+ st.title("Resume-Based Job Search")
106
+ st.write("Upload your resume to find matching job opportunities")
107
+
108
+ # Initialize PineconeHandler
109
+ try:
110
+ handler = PineconeHandler()
111
+ except Exception as e:
112
+ st.error(f"Error connecting to Pinecone: {str(e)}")
113
+ return
114
+
115
+ # File uploader
116
+ uploaded_file = st.file_uploader("Upload your resume", type=['pdf', 'docx', 'doc', 'txt'])
117
+
118
+ # Search parameters
119
+ num_results = st.slider("Number of results", min_value=1, max_value=20, value=5)
120
+
121
+ if uploaded_file:
122
+ with st.spinner("Processing resume..."):
123
+ # Extract and clean resume text
124
+ resume_text = extract_resume_text(uploaded_file)
125
+ if resume_text:
126
+ clean_text = clean_resume_text(resume_text)
127
+
128
+ # Preview extracted text
129
+ with st.expander("Preview extracted text"):
130
+ st.text(clean_text[:500] + "..." if len(clean_text) > 500 else clean_text)
131
+
132
+ # Search button
133
+ if st.button("Search Jobs"):
134
+ with st.spinner("Searching for matching jobs..."):
135
+ try:
136
+ # Search for similar job ads
137
+ results = handler.search_similar_ads(clean_text, top_k=num_results)
138
+
139
+ if results:
140
+ st.subheader("Matching Jobs")
141
+ for i, match in enumerate(results, 1):
142
+ metadata = match.metadata
143
+ score = match.score
144
+
145
+ # Create job card
146
+ with st.container():
147
+ # Header section with key information
148
+ col1, col2 = st.columns([2, 1])
149
+ with col1:
150
+ st.markdown(f"### {metadata['headline']}")
151
+ with col2:
152
+ st.markdown(f"**Match Score:** {score:.2f}")
153
+
154
+ # Job details section
155
+ st.markdown(f"**Company:** {metadata.get('company', 'Not specified')}")
156
+ st.markdown(f"**Location:** {metadata['city']}")
157
+ st.markdown(f"**Occupation:** {metadata['occupation']}")
158
+ st.markdown(f"**Published:** {metadata['published']}")
159
+ if metadata.get('logo_url'):
160
+ st.image(metadata['logo_url'], width=100)
161
+
162
+ # Check if description is truncated
163
+ description = metadata['description']
164
+ is_truncated = is_description_truncated(description)
165
+
166
+ # Display initial description preview
167
+ formatted_description = format_job_description(
168
+ description[:500] if is_truncated else description,
169
+ truncated=is_truncated
170
+ )
171
+ st.markdown(formatted_description)
172
+
173
+ # If truncated, show expandable full description
174
+ if is_truncated:
175
+ with st.expander("Read Full Description"):
176
+ # Try to fetch full description from webpage_url
177
+ st.markdown("""
178
+ **Note:** The full description has been truncated in our database.
179
+ Please visit the original job posting for complete details.
180
+ """)
181
+ if metadata.get('webpage_url'):
182
+ st.markdown(f"[View Original Job Posting]({metadata['webpage_url']})")
183
+
184
+ # Application section
185
+ st.markdown("### How to Apply")
186
+ if metadata.get('webpage_url'):
187
+ st.markdown(f"[Apply Online]({metadata['webpage_url']})")
188
+ if metadata.get('email'):
189
+ st.markdown(f"📧 Contact: {metadata['email']}")
190
+
191
+ st.markdown("---")
192
+ else:
193
+ st.info("No matching jobs found. Try adjusting your search criteria.")
194
+
195
+ except Exception as e:
196
+ st.error(f"Error searching jobs: {str(e)}")
197
+
198
+ if __name__ == "__main__":
199
+ main()
bootstrap.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ import logging
3
+ import time_handling
4
+ from get_ads import get_all_ads
5
+ from pinecone_handler import PineconeHandler, load_all
6
+
7
+ from settings import LOG_LEVEL, LOG_DATE_FORMAT, LOG_FORMAT, PLACES, OCCUPATIONS
8
+
9
+ log = logging.getLogger(__name__)
10
+ logging.basicConfig(stream=sys.stdout, level=LOG_LEVEL, format=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
11
+
12
+ if __name__ == '__main__':
13
+ """
14
+ This is executed once to initialize the Pinecone database and
15
+ load all ads into it. To keep the database updated, run main.py
16
+ """
17
+ # Initialize Pinecone handler
18
+ handler = PineconeHandler()
19
+ log.info('Pinecone connection initialized')
20
+
21
+ if PLACES or OCCUPATIONS:
22
+ # If filtering by location/occupation, set past timestamp
23
+ timestamp = time_handling.write_timestamp('2022-01-01T00:00:00')
24
+ else:
25
+ timestamp = time_handling.write_timestamp()
26
+ all_ads = get_all_ads()
27
+ load_all(all_ads)
28
+ log.info(f'Loaded {len(all_ads)} into Pinecone. Timestamp: {timestamp}')
get_ads.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ import json
3
+ import logging
4
+ import requests
5
+
6
+ from settings import LOG_LEVEL, LOG_DATE_FORMAT, LOG_FORMAT, STREAM_URL, SNAPSHOT_URL, PLACES, OCCUPATIONS
7
+
8
+ log = logging.getLogger(__name__)
9
+ logging.basicConfig(stream=sys.stdout, level=LOG_LEVEL, format=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
10
+
11
+
12
+ def _get(url, params={}):
13
+ log.info(f'Collecting ads from: {url} with params {params}')
14
+ headers = {'Accept': 'application/json'}
15
+ response = requests.get(url, headers=headers, params=params)
16
+ response.raise_for_status()
17
+ list_of_ads = json.loads(response.content.decode('utf8'))
18
+ log.info(f"Got {len(list_of_ads)} ads from {url}. Params: {params}")
19
+ return list_of_ads
20
+
21
+
22
+ def get_all_ads():
23
+ return _get(SNAPSHOT_URL)
24
+
25
+
26
+ def get_ads_since_time(timestamp):
27
+ params = {'date': timestamp}
28
+ if PLACES:
29
+ params['location-concept-id'] = PLACES
30
+ if OCCUPATIONS:
31
+ params['occupation-concept-id'] = OCCUPATIONS
32
+ return _get(STREAM_URL, params)
main.py ADDED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ from time import sleep
3
+ import logging
4
+
5
+ from pinecone_handler import PineconeHandler
6
+ import get_ads
7
+ from time_handling import timestamp_now, write_timestamp, read_timestamp
8
+
9
+ from settings import LOG_LEVEL, LOG_DATE_FORMAT, LOG_FORMAT, MAX_UPDATES, SLEEP_TIME_MINUTES
10
+
11
+ log = logging.getLogger(__name__)
12
+ logging.basicConfig(stream=sys.stdout, level=LOG_LEVEL, format=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
13
+
14
+ def keep_updated():
15
+ handler = PineconeHandler()
16
+ last_timestamp = read_timestamp()
17
+ counter = 0
18
+
19
+ while True:
20
+ new_timestamp = timestamp_now()
21
+ log.info(f"Getting ads after timestamp '{last_timestamp}'")
22
+
23
+ if ads := get_ads.get_ads_since_time(last_timestamp):
24
+ handler.upsert_ads(ads)
25
+ else:
26
+ log.info(f"No ads found after timestamp '{last_timestamp}'")
27
+
28
+ write_timestamp(new_timestamp)
29
+ counter += 1
30
+ log.info(f"Completed update {counter} of {MAX_UPDATES}")
31
+
32
+ if counter == MAX_UPDATES:
33
+ break
34
+
35
+ log.info(f"Waiting {SLEEP_TIME_MINUTES} minutes before collecting ads again")
36
+ sleep(SLEEP_TIME_MINUTES * 60)
37
+
38
+ log.info('Finished')
39
+
40
+ if __name__ == '__main__':
41
+ """
42
+ Important:
43
+ You must run bootstrap.py first to initialize Pinecone and
44
+ load current ads into the database (if applicable)
45
+ """
46
+ keep_updated()
pinecone_handler.py ADDED
@@ -0,0 +1,192 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+ import sys
3
+ import logging
4
+ from pinecone import Pinecone, ServerlessSpec
5
+ from sentence_transformers import SentenceTransformer
6
+ from typing import List, Dict, Any
7
+
8
+ from settings import (
9
+ LOG_LEVEL,
10
+ LOG_DATE_FORMAT,
11
+ LOG_FORMAT,
12
+ PINECONE_API_KEY,
13
+ PINECONE_ENVIRONMENT,
14
+ PINECONE_INDEX_NAME
15
+ )
16
+
17
+ log = logging.getLogger(__name__)
18
+ logging.basicConfig(stream=sys.stdout, level=LOG_LEVEL, format=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
19
+
20
+ class PineconeHandler:
21
+ """
22
+ Handles connections and operations with Pinecone vector database
23
+ for storing and retrieving job ads
24
+ """
25
+ def __init__(self):
26
+ self.pc = Pinecone(api_key=PINECONE_API_KEY)
27
+ self.BATCH_SIZE = 100 # Number of vectors to upsert at once
28
+
29
+ try:
30
+ self.index = self.pc.Index(PINECONE_INDEX_NAME)
31
+ log.info(f"Connected to existing index '{PINECONE_INDEX_NAME}'")
32
+ except Exception as e:
33
+ log.info(f"Creating new index '{PINECONE_INDEX_NAME}'")
34
+ spec = ServerlessSpec(
35
+ cloud="aws",
36
+ region="us-west-2"
37
+ )
38
+
39
+ self.pc.create_index(
40
+ name=PINECONE_INDEX_NAME,
41
+ dimension=384,
42
+ metric="cosine",
43
+ spec=spec
44
+ )
45
+ self.index = self.pc.Index(PINECONE_INDEX_NAME)
46
+
47
+ self.model = SentenceTransformer('all-MiniLM-L6-v2')
48
+ log.info(f"Initialized connection to Pinecone index '{PINECONE_INDEX_NAME}'")
49
+
50
+ def _create_embedding(self, ad: Dict[str, Any]) -> List[float]:
51
+ """Create embedding from job ad text"""
52
+ try:
53
+ # Safely get text fields with fallbacks to empty string
54
+ headline = ad.get('headline', '') or ''
55
+ occupation = ad.get('occupation', {})
56
+ occupation_label = occupation.get('label', '') if occupation else ''
57
+ description = ad.get('description', {})
58
+ description_text = description.get('text', '') if description else ''
59
+
60
+ # Combine text fields
61
+ text_to_embed = f"{headline} {occupation_label} {description_text}".strip()
62
+
63
+ # If we have no text to embed, raise an exception
64
+ if not text_to_embed:
65
+ raise ValueError("No text content available for embedding")
66
+
67
+ return self.model.encode(text_to_embed).tolist()
68
+ except Exception as e:
69
+ log.error(f"Error creating embedding for ad {ad.get('id', 'unknown')}: {str(e)}")
70
+ raise
71
+
72
+ def _prepare_metadata(self, ad: Dict[str, Any]) -> Dict[str, str]:
73
+ """Extract metadata from ad for storage"""
74
+ try:
75
+ # Safely get nested values with fallbacks
76
+ application_details = ad.get('application_details', {}) or {}
77
+ workplace_address = ad.get('workplace_address', {}) or {}
78
+ occupation = ad.get('occupation', {}) or {}
79
+ description = ad.get('description', {}) or {}
80
+
81
+ # Limit the size of text fields and handle potential None values
82
+ return {
83
+ 'email': (application_details.get('email', '') or '')[:100],
84
+ 'city': (workplace_address.get('municipality', '') or '')[:100],
85
+ 'occupation': (occupation.get('label', '') or '')[:100],
86
+ 'headline': (ad.get('headline', '') or '')[:200],
87
+ 'description': (description.get('text', '') or '')[:1000],
88
+ 'logo_url': (ad.get('logo_url', '') or '')[:200],
89
+ 'webpage_url': (ad.get('webpage_url', '') or '')[:200],
90
+ 'published': (ad.get('publication_date', '') or '')[:50]
91
+ }
92
+ except Exception as e:
93
+ log.error(f"Error preparing metadata for ad {ad.get('id', 'unknown')}: {str(e)}")
94
+ raise
95
+
96
+ def _batch_upsert(self, vectors: List[tuple]) -> None:
97
+ """
98
+ Upsert a batch of vectors to Pinecone
99
+
100
+ Args:
101
+ vectors: List of tuples, each containing (id, vector, metadata)
102
+ """
103
+ try:
104
+ # Prepare the vectors in the format Pinecone expects
105
+ upsert_data = [(str(id), vec, meta) for id, vec, meta in vectors]
106
+
107
+ # Perform the upsert operation
108
+ self.index.upsert(vectors=upsert_data)
109
+
110
+ log.debug(f"Successfully upserted batch of {len(vectors)} vectors")
111
+ except Exception as e:
112
+ log.error(f"Error upserting batch: {str(e)}")
113
+ raise
114
+
115
+ def upsert_ads(self, ads: List[Dict[str, Any]]) -> None:
116
+ """Insert or update multiple ads in batches"""
117
+ vectors = []
118
+ deleted = 0
119
+ processed = 0
120
+ skipped = 0
121
+
122
+ for ad in ads:
123
+ try:
124
+ # Skip None or empty ads
125
+ if not ad:
126
+ log.warning("Skipping None or empty ad")
127
+ skipped += 1
128
+ continue
129
+
130
+ ad_id = ad.get('id')
131
+ if not ad_id:
132
+ log.warning("Skipping ad without ID")
133
+ skipped += 1
134
+ continue
135
+
136
+ if ad.get('removed', False):
137
+ self.delete_ad(ad_id)
138
+ deleted += 1
139
+ continue
140
+
141
+ try:
142
+ vector = self._create_embedding(ad)
143
+ metadata = self._prepare_metadata(ad)
144
+ vectors.append((ad_id, vector, metadata))
145
+ processed += 1
146
+
147
+ # When we reach batch size, upsert the batch
148
+ if len(vectors) >= self.BATCH_SIZE:
149
+ self._batch_upsert(vectors)
150
+ vectors = [] # Clear the batch
151
+
152
+ except Exception as e:
153
+ log.error(f"Error processing ad {ad_id}: {str(e)}")
154
+ skipped += 1
155
+
156
+ except Exception as e:
157
+ log.error(f"Unexpected error processing ad: {str(e)}")
158
+ skipped += 1
159
+
160
+ # Upsert any remaining vectors
161
+ if vectors:
162
+ self._batch_upsert(vectors)
163
+
164
+ log.info(f"Processing complete: {processed} ads upserted, {deleted} deleted, {skipped} skipped")
165
+
166
+ def delete_ad(self, ad_id: str) -> None:
167
+ """Delete an ad by ID"""
168
+ try:
169
+ self.index.delete(ids=[ad_id])
170
+ log.debug(f"Deleted ad {ad_id} from Pinecone")
171
+ except Exception as e:
172
+ log.error(f"Error deleting ad {ad_id}: {str(e)}")
173
+
174
+ def search_similar_ads(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
175
+ """Search for similar job ads based on text query"""
176
+ query_embedding = self.model.encode(query).tolist()
177
+ results = self.index.query(
178
+ vector=query_embedding,
179
+ top_k=top_k,
180
+ include_metadata=True
181
+ )
182
+ return results.matches
183
+
184
+ def load_all(all_ads):
185
+ handler = PineconeHandler()
186
+ handler.upsert_ads(all_ads)
187
+
188
+ def update(list_of_updated_ads):
189
+ start = datetime.now()
190
+ handler = PineconeHandler()
191
+ handler.upsert_ads(list_of_updated_ads)
192
+ log.info(f"{len(list_of_updated_ads)} ads processed. Time: {datetime.now() - start}")
settings.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+
3
+ PINECONE_API_KEY = "pcsk_3nKkDX_K2CLsXYCkmJwP2gkv2HtCEe3ksJ7J3uQgx9ajAG63BFezrQUC5jFZVjadte4Sh8"
4
+ PINECONE_ENVIRONMENT = "gcp-starter"
5
+ PINECONE_INDEX_NAME = "jobads-index"
6
+
7
+ DB_TABLE_NAME = 'jobads'
8
+ DB_FILE_NAME = 'jobads_database_20220127.db'
9
+ TIMESTAMP_FILE = 'timestamp2.txt'
10
+
11
+ BASE_URL = 'https://jobstream.api.jobtechdev.se'
12
+ STREAM_URL = f"{BASE_URL}/stream"
13
+ SNAPSHOT_URL = f"{BASE_URL}/snapshot"
14
+
15
+ SLEEP_TIME_MINUTES = 0.1
16
+ MAX_UPDATES = 4
17
+
18
+ DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
19
+
20
+ # Logging
21
+ LOG_LEVEL = logging.INFO # Change INFO to DEBUG for verbose logging
22
+ LOG_FORMAT = '%(asctime)s %(levelname)-8s %(message)s'
23
+ LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
24
+
25
+ """
26
+ Examples for the municiplaities in Västerbottens Län:
27
+ Skellefteå - kicB_LgH_2Dk
28
+ Robertsfors - p8Mv_377_bxp
29
+ Norsjö - XmpG_vPQ_K7T
30
+ Vindeln - izT6_zWu_tta
31
+ Umeå - QiGt_BLu_amP
32
+ Vännäs - utQc_6xq_Dfm
33
+ """
34
+
35
+ # if you don't want to do geographical filtering, set PLACES = []
36
+ #PLACES = ['kicB_LgH_2Dk', 'p8Mv_377_bxp', 'XmpG_vPQ_K7T', 'izT6_zWu_tta', 'QiGt_BLu_amP', 'utQc_6xq_Dfm']
37
+ PLACES = []
38
+
39
+ # if you don't want to do filtering on occupations, set OCCUPATIONS = []
40
+ #OCCUPATIONS = ['Z6TY_xDf_Yup'] # Städare
41
+ OCCUPATIONS = []
time_handling.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+
3
+ from datetime import datetime
4
+ import logging
5
+
6
+ from settings import LOG_LEVEL, LOG_DATE_FORMAT, LOG_FORMAT, DATE_FORMAT, TIMESTAMP_FILE
7
+
8
+ log = logging.getLogger(__name__)
9
+ logging.basicConfig(stream=sys.stdout, level=LOG_LEVEL, format=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
10
+
11
+
12
+ def elapsed_time(start):
13
+ return datetime.now() - start
14
+
15
+
16
+ def timestamp_now():
17
+ return datetime.now().strftime(DATE_FORMAT)
18
+
19
+
20
+ def write_timestamp(timestamp=None):
21
+ if not timestamp:
22
+ timestamp = timestamp_now()
23
+ with open(file=TIMESTAMP_FILE, mode='w') as f:
24
+ f.write(timestamp)
25
+ log.info(f"New timestamp written: {timestamp}")
26
+ return timestamp
27
+
28
+
29
+ def read_timestamp():
30
+ with open(file=TIMESTAMP_FILE, mode='r') as f:
31
+ timestamp = f.read()
32
+ return timestamp.strip('\n')
timestamp2.txt ADDED
@@ -0,0 +1 @@
 
 
1
+ 2024-12-22T13:55:53