有一张表里面有三个字段,分别是(id,开始时间,结束时间),表中数据量为 5000W,如何统计流量最大的时候有多少条数据?
有一张表里面有三个字段,分别是(id,开始时间,结束时间),表中数据量为 5000W,如何统计流量最大的时候有多少条数据?
回答重点
题干没有告知峰值的统计单位,可以直接询问面试官,原理都是一样的。本答案以秒作为单位,统计每秒的最大值。
我们要统计每秒钟内的最大并发流量,也就是在某一秒内有多少个事件处于活动状态(即时间段的重叠),可以使用差分数组和扫描线思想来实现。
我们可以通过将每个事件活动的开始时间和结束时间记录为增量(开始时流量 +1,结束时流量 -1),并通过扫描线的方式对每一秒进行累加,最终得到每秒的并发流量。
1 2 3 4 5
| CREATE TABLE events ( id INT, start_time DATETIME, end_time DATETIME );
|
假设 start_time 和 end_time 记录的时间单位就是秒,使用一个差分数组(增量数组)来记录每秒的流量变化。具体来说:
- 我们将每个事件的开始和结束时间处理成时间点(例如:start_time 对应 +1 增加并发,end_time + 1 对应 -1 减少并发),并存储在一个列表中
- 然后我们对这些时间点进行排序,并计算每个时间点的并发变化,最终找出最大并发数
假设我们有两个事件:
- 事件1:从
2024-12-05 10:00:00 到 2024-12-05 10:00:30
- 事件2:从
2024-12-05 10:00:01 到 2024-12-05 10:00:50
我们关心的是这些事件在某些时间点的重叠情况(即并发数),记录每个事件的开始和结束时刻,增减并发数。
时间戳变化图:
1 2 3 4
| 时间点 | 10:00:00 | 10:00:01 | 10:00:30 | 10:00:50 | 事件1 | 开始 -> +1 | | 结束 -> -1 | | 事件2 | | 开始 -> +1 | | 结束 -> -1 | 并发数 | 1 | 2 | 1 | 0 |
|
2024-12-05 10:00:01 时并发数达到了最大值 2,因为此时事件1和事件2同时在进行。
代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| import java.util.*;
public class EventConcurrency { public static class Event { String id; Date startTime; Date endTime; public Event(String id, Date startTime, Date endTime) { this.id = id; this.startTime = startTime; this.endTime = endTime; } } public static class EventPoint { Date timestamp; int change; public EventPoint(Date timestamp, int change) { this.timestamp = timestamp; this.change = change; } }
public static void main(String[] args) { List<Event> events = Arrays.asList( new Event("1", parseDate("2024-12-05 10:00:00"), parseDate("2024-12-05 10:00:30")), new Event("2", parseDate("2024-12-05 10:00:01"), parseDate("2024-12-05 10:00:50")) ); Map.Entry<Date, Integer> result = getMaxConcurrency(events); if (result != null) { System.out.println("Max concurrency: " + result.getValue() + " at time: " + formatDate(result.getKey())); } else { System.out.println("No events found."); } }
public static Map.Entry<Date, Integer> getMaxConcurrency(List<Event> events) { List<EventPoint> eventPoints = new ArrayList<>();
for (Event event : events) { eventPoints.add(new EventPoint(event.startTime, 1)); eventPoints.add(new EventPoint(addSecond(event.endTime), -1)); } eventPoints.sort(Comparator.comparing((EventPoint p) -> p.timestamp)); int currentConcurrency = 0; int maxConcurrency = 0; Date maxConcurrencyTime = null;
for (EventPoint point : eventPoints) { currentConcurrency += point.change; if (currentConcurrency > maxConcurrency) { maxConcurrency = currentConcurrency; maxConcurrencyTime = point.timestamp; } } return new AbstractMap.SimpleEntry<>(maxConcurrencyTime, maxConcurrency); }
public static Date parseDate(String dateStr) { try { return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dateStr); } catch (java.text.ParseException e) { e.printStackTrace(); return null; } } public static String formatDate(Date date) { return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date); } public static Date addSecond(Date date) { Calendar cal = Calendar.getInstance(); cal.setTime(date); cal.add(Calendar.SECOND, 1); return cal.getTime(); } }
|
理解上面的思路后,我们再来看看题干,表中有 5000w 数据,所以需要性能优化,而不是一次性加载所有的数据到内存中。
性能优化
对于 5000万条数据的规模,直接查询和处理可能会非常慢。为了提高效率,可以考虑以下优化:
- 索引:在 start_time 和 end_time 字段上创建索引,以加速查询。
- 按时间范围查询:如果事件按时间分布较均匀,按时间范围(例如每天)分批次查询会更高效。
- 数据据分批加载:除了时间范围,也可以分页加载处理
- 减少内存占用:利用 Map 来存储每个时间戳的增减信息,而不需要一次性存储所有时间点。
- 多线程优化:可以使用多线程并行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| import java.util.*; import java.util.concurrent.*; import java.util.stream.*; import java.sql.*; import java.text.SimpleDateFormat;
public class OptimizedEventConcurrency { public static class Event { String id; Date startTime; Date endTime; public Event(String id, Date startTime, Date endTime) { this.id = id; this.startTime = startTime; this.endTime = endTime; } }
public static void main(String[] args) { String url = "jdbc:mysql://localhost:3306/events_db"; String user = "root"; String password = "password"; int pageSize = 10000; int currentPage = 1; Map<Long, Integer> timestampChanges = new ConcurrentSkipListMap<>(); try (Connection conn = DriverManager.getConnection(url, user, password)) { while (true) { List<Event> events = fetchEventsFromDatabase(conn, currentPage, pageSize); if (events.isEmpty()) break; processEventsInParallel(events, timestampChanges); currentPage++; } Map.Entry<Long, Integer> result = getMaxConcurrency(timestampChanges); if (result != null) { System.out.println("最大并发数: " + result.getValue() + " 发生在时间戳: " + formatTimestamp(result.getKey())); } else { System.out.println("没有找到事件数据。"); } } catch (SQLException e) { e.printStackTrace(); } }
public static List<Event> fetchEventsFromDatabase(Connection conn, int page, int pageSize) throws SQLException { String query = "SELECT id, start_time, end_time FROM events LIMIT ?, ?"; try (PreparedStatement ps = conn.prepareStatement(query)) { ps.setInt(1, (page - 1) * pageSize); ps.setInt(2, pageSize); try (ResultSet rs = ps.executeQuery()) { List<Event> events = new ArrayList<>(); while (rs.next()) { String id = rs.getString("id"); Date startTime = rs.getTimestamp("start_time"); Date endTime = rs.getTimestamp("end_time"); events.add(new Event(id, startTime, endTime)); } return events; } } }
public static void processEventsInParallel(List<Event> events, Map<Long, Integer> timestampChanges) { events.parallelStream().forEach(event -> { long startTimestamp = event.startTime.getTime() / 1000; timestampChanges.merge(startTimestamp, 1, Integer::sum); long endTimestamp = (event.endTime.getTime() + 1000) / 1000; timestampChanges.merge(endTimestamp, -1, Integer::sum); }); }
public static Map.Entry<Long, Integer> getMaxConcurrency(Map<Long, Integer> timestampChanges) { int currentConcurrency = 0; int maxConcurrency = 0; long maxConcurrencyTimestamp = -1;
for (Map.Entry<Long, Integer> entry : timestampChanges.entrySet()) { currentConcurrency += entry.getValue(); if (currentConcurrency > maxConcurrency) { maxConcurrency = currentConcurrency; maxConcurrencyTimestamp = entry.getKey(); } }
return maxConcurrency > 0 ? new AbstractMap.SimpleEntry<>(maxConcurrencyTimestamp, maxConcurrency) : null; }
public static String formatTimestamp(long timestamp) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(new Date(timestamp * 1000)); } }
|
代码的改造如下:
- 分页查询数据库,避免一次性加载所有数据,减少内存压力
- 使用 parallelStream() 来并行处理每批数据,充分利用多核 CPU 提高性能
- ConcurrentSkipListMap 保证线程安全和数据的有序性
扩展知识
差分数组(Differential Array)
差分数组是一种通过记录区间增量来高效地求解区间问题的数据结构,广泛用于处理一些涉及区间的计算问题,尤其是在需要多次区间更新和区间求和的场景中。它能将原本复杂的区间操作转化为简单的点操作,从而提高效率。
基本思想
1)差分数组 是一种记录每个区间开始和结束时增量的数据结构,通过对差分数组的累加,恢复出原数组。
2)对于一个数组 arr[],差分数组 diff[] 的定义是:
diff[i] = arr[i] - arr[i-1] (对于第一个元素 arr[0],diff[0] = arr[0])
- 通过差分数组
diff[],可以在 O(1) 时间内更新 arr[] 的任何区间操作(加法、减法等)。
例子:如何使用差分数组来实现区间更新
假设我们有一个数组 arr[],并希望对其中的一些区间进行加法操作。例如,我们希望对区间 [l, r] 的每个元素加上一个常数值 k。
原数组和差分数组:
1 2
| arr = [0, 0, 0, 0, 0] // 初始数组 diff = [0, 0, 0, 0, 0] // 差分数组
|
对 [2, 4] 区间加 5:
- 在差分数组中:
diff[2] += 5,diff[5] -= 5(注意diff[5]越界,表示这个区间的结束,实际上应为diff[r+1])1
| diff = [0, 0, 5, 0, 0] // 差分数组
|
恢复 arr[]:
通过差分数组,我们可以恢复原始数组:
arr[0] = diff[0]
arr[1] = diff[1] + arr[0]
arr[2] = diff[2] + arr[1]
arr[3] = diff[3] + arr[2]
arr[4] = diff[4] + arr[3]
恢复后,arr[] 的值为 [0, 0, 5, 5, 5],即区间 [2, 4] 被加上了 5。
差分数组的关键在于记录增量,在更新时,直接在差分数组中修改该区间的起始点和结束点的值,通过对差分数组的累加即可恢复原数组的值。
扫描线算法
扫描线算法(Sweep Line Algorithm)是一种在计算几何、计算机图形学以及一些优化问题中广泛使用的算法。其基本思想是:通过将问题的二维空间按照某一方向扫描,并实时更新状态,从而高效解决问题。
扫描线算法通过“扫描”整个问题空间,在每个扫描位置更新数据结构(比如维护一个排序的事件队列)来处理这些事件点。这个方法比直接计算每对元素之间的关系要高效得多,尤其是在处理大量数据时。
常见应用
- 动态区间问题:例如计算一个时间段内的活动数量,或者计算某个时间段的最大并发请求数。
- 区间求和:可以在O(1)时间内实现区间范围的累加和查询。
- 图形处理:例如计算矩形重叠区域的面积问题,扫描线算法也广泛应用于计算几何。