高效处理定时数据抓取、去重和缺失数据是数据工程中的常见任务。以下是一个系统化的解决方案,涵盖数据抓取、去重和缺失数据处理的关键步骤:
使用自动化工具或框架定期抓取数据,确保数据的及时性和一致性。
requests
或 Scrapy
抓取数据。APScheduler
或 Celery
设置定时任务。from apscheduler.schedulers.blocking import BlockingScheduler
import requests
def fetch_data():
url = "https://example.com/api/data"
response = requests.get(url)
data = response.json()
# 保存数据到文件或数据库
with open("data.json", "w") as f:
f.write(data)
scheduler = BlockingScheduler()
scheduler.add_job(fetch_data, 'interval', hours=1) # 每小时抓取一次
scheduler.start()
在数据抓取后,确保数据集中没有重复记录。
id
),可以使用数据库的 UNIQUE
约束或 Python 的 set
去重。import pandas as pd
# 假设数据已加载到 DataFrame
data = pd.read_json("data.json")
# 基于唯一标识符去重
data.drop_duplicates(subset=["id"], keep="first", inplace=True)
# 基于内容去重(生成哈希值)
data["hash"] = data.apply(lambda row: hash(tuple(row)), axis=1)
data.drop_duplicates(subset=["hash"], keep="first", inplace=True)
处理数据中的缺失值,确保数据的完整性和可用性。
# 删除缺失值
data.dropna(inplace=True)
# 填充缺失值
data["column_name"].fillna(data["column_name"].mean(), inplace=True) # 均值填充
data["column_name"].fillna(method="ffill", inplace=True) # 前向填充
data["column_name"].fillna(method="bfill", inplace=True) # 后向填充
将处理后的数据存储到数据库或文件中,便于后续分析和使用。
import sqlite3
# 创建数据库连接
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY,
column1 TEXT,
column2 REAL
)
""")
# 插入数据
data.to_sql("data", conn, if_exists="replace", index=False)
# 关闭连接
conn.close()
确保数据抓取和处理过程的稳定性和可追溯性。
logging
模块记录抓取和处理过程中的关键信息。import logging
logging.basicConfig(filename="data_pipeline.log", level=logging.INFO)
try:
fetch_data()
logging.info("Data fetched successfully")
except Exception as e:
logging.error(f"Error fetching data: {e}")
通过以上步骤,可以实现定时数据抓取、去重和缺失数据处理的高效自动化流程。根据具体需求,可以进一步优化和扩展。