def process_data_and_generate_report(data_source, output_format, include_summary, include_charts, validate_input, log_details): """这是一个超长函数示例,包含多个职责""" # 1. 数据验证部分 if validate_input: if not data_source: raise ValueError("数据源不能为空") if not os.path.exists(data_source): raise FileNotFoundError(f"文件不存在: {data_source}") if not data_source.endswith('.csv') and not data_source.endswith('.json'): raise ValueError("只支持 CSV 或 JSON 格式") # 2. 读取数据部分 data = [] if data_source.endswith('.csv'): with open(data_source, 'r', encoding='utf-8') as f: reader = csv.reader(f) headers = next(reader) for row in reader: data.append(row) else: # JSON with open(data_source, 'r', encoding='utf-8') as f: data = json.load(f) # 3. 数据处理部分 processed_data = [] for item in data: # 清理数据 cleaned_item = {} for key, value in item.items(): if value is None: cleaned_item[key] = '' elif isinstance(value, str): cleaned_item[key] = value.strip() else: cleaned_item[key] = value # 转换数据类型 if 'date' in cleaned_item: cleaned_item['date'] = datetime.strptime(cleaned_item['date'], '%Y-%m-%d') if 'amount' in cleaned_item: cleaned_item['amount'] = float(cleaned_item['amount']) # 添加计算字段 if 'price' in cleaned_item and 'quantity' in cleaned_item: cleaned_item['total'] = cleaned_item['price'] * cleaned_item['quantity'] processed_data.append(cleaned_item) # 4. 汇总计算部分 summary = {} if include_summary: if processed_data: summary['total_records'] = len(processed_data) if 'amount' in processed_data[0]: summary['total_amount'] = sum(item.get('amount', 0) for item in processed_data) summary['average_amount'] = summary['total_amount'] / len(processed_data) if 'total' in processed_data[0]: summary['grand_total'] = sum(item.get('total', 0) for item in processed_data) # 5. 报告生成部分 report = {} if output_format == 'html': report['format'] = 'html' report['content'] = generate_html_report(processed_data, summary, include_charts) elif output_format == 'json': report['format'] = 'json' report['content'] = json.dumps({ 'data': processed_data, 'summary': summary, 'timestamp': datetime.now().isoformat() }, indent=2, default=str) elif output_format == 'csv': report['format'] = 'csv' report['content'] = generate_csv_report(processed_data, summary) else: report['format'] = 'text' report['content'] = generate_text_report(processed_data, summary) # 6. 日志记录部分 if log_details: log_entry = { 'timestamp': datetime.now().isoformat(), 'data_source': data_source, 'records_processed': len(processed_data), 'output_format': output_format, 'summary_included': include_summary } with open('processing_log.json', 'a') as log_file: json.dump(log_entry, log_file) log_file.write('\n') return report