有一张表里面有三个字段,分别是(id,开始时间,结束时间),表中数据量为 5000W,如何统计流量最大的时候有多少条数据?

Sherwin.Wei Lv7

有一张表里面有三个字段,分别是(id,开始时间,结束时间),表中数据量为 5000W,如何统计流量最大的时候有多少条数据?

回答重点

题干没有告知峰值的统计单位,可以直接询问面试官,原理都是一样的。本答案以秒作为单位,统计每秒的最大值。

我们要统计每秒钟内的最大并发流量,也就是在某一秒内有多少个事件处于活动状态(即时间段的重叠),可以使用差分数组扫描线思想来实现。

我们可以通过将每个事件活动的开始时间和结束时间记录为增量(开始时流量 +1,结束时流量 -1),并通过扫描线的方式对每一秒进行累加,最终得到每秒的并发流量。

1
2
3
4
5
CREATE TABLE events (
id INT,
start_time DATETIME,
end_time DATETIME
);

假设 start_time 和 end_time 记录的时间单位就是秒,使用一个差分数组(增量数组)来记录每秒的流量变化。具体来说:

  • 我们将每个事件的开始和结束时间处理成时间点(例如:start_time 对应 +1 增加并发,end_time + 1 对应 -1 减少并发),并存储在一个列表中
  • 然后我们对这些时间点进行排序,并计算每个时间点的并发变化,最终找出最大并发数

假设我们有两个事件:

  • 事件1:从 2024-12-05 10:00:002024-12-05 10:00:30
  • 事件2:从 2024-12-05 10:00:012024-12-05 10:00:50

我们关心的是这些事件在某些时间点的重叠情况(即并发数),记录每个事件的开始和结束时刻,增减并发数。

时间戳变化图:

1
2
3
4
时间点     | 10:00:00   | 10:00:01   | 10:00:30  | 10:00:50  |
事件1 | 开始 -> +1 | | 结束 -> -1 | |
事件2 | | 开始 -> +1 | | 结束 -> -1 |
并发数 | 1 | 2 | 1 | 0 |

2024-12-05 10:00:01 时并发数达到了最大值 2,因为此时事件1和事件2同时在进行。

代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.*;

public class EventConcurrency {

public static class Event {
String id;
Date startTime;
Date endTime;

public Event(String id, Date startTime, Date endTime) {
this.id = id;
this.startTime = startTime;
this.endTime = endTime;
}
}

public static class EventPoint {
Date timestamp;
int change; // +1 for start, -1 for end + 1 second

public EventPoint(Date timestamp, int change) {
this.timestamp = timestamp;
this.change = change;
}
}

public static void main(String[] args) {
// mock 数据
List<Event> events = Arrays.asList(
new Event("1", parseDate("2024-12-05 10:00:00"), parseDate("2024-12-05 10:00:30")),
new Event("2", parseDate("2024-12-05 10:00:01"), parseDate("2024-12-05 10:00:50"))
);

// 获取最大并发量和时间点
Map.Entry<Date, Integer> result = getMaxConcurrency(events);

if (result != null) {
System.out.println("Max concurrency: " + result.getValue() + " at time: " + formatDate(result.getKey()));
} else {
System.out.println("No events found.");
}
}

public static Map.Entry<Date, Integer> getMaxConcurrency(List<Event> events) {
List<EventPoint> eventPoints = new ArrayList<>();


for (Event event : events) {
eventPoints.add(new EventPoint(event.startTime, 1)); // Start time: +1 concurrency
eventPoints.add(new EventPoint(addSecond(event.endTime), -1)); // End time + 1: -1 concurrency
}

// 通过时间排序
eventPoints.sort(Comparator.comparing((EventPoint p) -> p.timestamp));

int currentConcurrency = 0;
int maxConcurrency = 0;
Date maxConcurrencyTime = null;

// 遍历已排序的事件点以查找最大并发性
for (EventPoint point : eventPoints) {
currentConcurrency += point.change;
if (currentConcurrency > maxConcurrency) {
maxConcurrency = currentConcurrency;
maxConcurrencyTime = point.timestamp;
}
}

return new AbstractMap.SimpleEntry<>(maxConcurrencyTime, maxConcurrency);
}


public static Date parseDate(String dateStr) {
try {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dateStr);
} catch (java.text.ParseException e) {
e.printStackTrace();
return null;
}
}

public static String formatDate(Date date) {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
}

// +1 秒
public static Date addSecond(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.SECOND, 1);
return cal.getTime();
}
}

理解上面的思路后,我们再来看看题干,表中有 5000w 数据,所以需要性能优化,而不是一次性加载所有的数据到内存中。

性能优化

对于 5000万条数据的规模,直接查询和处理可能会非常慢。为了提高效率,可以考虑以下优化:

  • 索引:在 start_time 和 end_time 字段上创建索引,以加速查询。
  • 按时间范围查询:如果事件按时间分布较均匀,按时间范围(例如每天)分批次查询会更高效。
  • 数据据分批加载:除了时间范围,也可以分页加载处理
  • 减少内存占用:利用 Map 来存储每个时间戳的增减信息,而不需要一次性存储所有时间点。
  • 多线程优化:可以使用多线程并行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import java.sql.*;
import java.text.SimpleDateFormat;

public class OptimizedEventConcurrency {

public static class Event {
String id;
Date startTime;
Date endTime;

public Event(String id, Date startTime, Date endTime) {
this.id = id;
this.startTime = startTime;
this.endTime = endTime;
}
}

public static void main(String[] args) {
// 数据库连接(示例中为模拟)
String url = "jdbc:mysql://localhost:3306/events_db";
String user = "root";
String password = "password";

// 每次查询的分页大小(每次加载的记录数)
int pageSize = 10000;
int currentPage = 1;

// 使用 ConcurrentSkipListMap 来保证时间戳的线程安全排序
Map<Long, Integer> timestampChanges = new ConcurrentSkipListMap<>();

// 数据库连接(用于读取事件数据)
try (Connection conn = DriverManager.getConnection(url, user, password)) {

// 假设总记录有 5000w 条,分页加载
while (true) {
List<Event> events = fetchEventsFromDatabase(conn, currentPage, pageSize);
if (events.isEmpty()) break; // 没有更多的事件数据,退出循环

// 使用并行流处理事件,减少处理时间
processEventsInParallel(events, timestampChanges);

currentPage++; // 下一页
}

// 计算所有事件的最大并发数
Map.Entry<Long, Integer> result = getMaxConcurrency(timestampChanges);
if (result != null) {
System.out.println("最大并发数: " + result.getValue() + " 发生在时间戳: " + formatTimestamp(result.getKey()));
} else {
System.out.println("没有找到事件数据。");
}
} catch (SQLException e) {
e.printStackTrace();
}
}

// 从数据库中获取事件数据,分页查询
public static List<Event> fetchEventsFromDatabase(Connection conn, int page, int pageSize) throws SQLException {
String query = "SELECT id, start_time, end_time FROM events LIMIT ?, ?";
try (PreparedStatement ps = conn.prepareStatement(query)) {
ps.setInt(1, (page - 1) * pageSize);
ps.setInt(2, pageSize);

try (ResultSet rs = ps.executeQuery()) {
List<Event> events = new ArrayList<>();
while (rs.next()) {
String id = rs.getString("id");
Date startTime = rs.getTimestamp("start_time");
Date endTime = rs.getTimestamp("end_time");
events.add(new Event(id, startTime, endTime));
}
return events;
}
}
}

// 使用并行流处理事件并更新时间戳变化
public static void processEventsInParallel(List<Event> events, Map<Long, Integer> timestampChanges) {
events.parallelStream().forEach(event -> {
// 对开始时间添加 +1
long startTimestamp = event.startTime.getTime() / 1000;
timestampChanges.merge(startTimestamp, 1, Integer::sum);

// 对结束时间(加1秒)添加 -1
long endTimestamp = (event.endTime.getTime() + 1000) / 1000; // 结束时间 + 1 秒
timestampChanges.merge(endTimestamp, -1, Integer::sum);
});
}

// 从时间戳变化映射中计算最大并发数
public static Map.Entry<Long, Integer> getMaxConcurrency(Map<Long, Integer> timestampChanges) {
int currentConcurrency = 0;
int maxConcurrency = 0;
long maxConcurrencyTimestamp = -1;

// 按时间戳排序,计算并发数
for (Map.Entry<Long, Integer> entry : timestampChanges.entrySet()) {
currentConcurrency += entry.getValue();
if (currentConcurrency > maxConcurrency) {
maxConcurrency = currentConcurrency;
maxConcurrencyTimestamp = entry.getKey();
}
}

return maxConcurrency > 0 ? new AbstractMap.SimpleEntry<>(maxConcurrencyTimestamp, maxConcurrency) : null;
}

// 将时间戳格式
public static String formatTimestamp(long timestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date(timestamp * 1000));
}
}

代码的改造如下:

  • 分页查询数据库,避免一次性加载所有数据,减少内存压力
  • 使用 parallelStream() 来并行处理每批数据,充分利用多核 CPU 提高性能
  • ConcurrentSkipListMap 保证线程安全和数据的有序性

扩展知识

差分数组(Differential Array)

差分数组是一种通过记录区间增量来高效地求解区间问题的数据结构,广泛用于处理一些涉及区间的计算问题,尤其是在需要多次区间更新和区间求和的场景中。它能将原本复杂的区间操作转化为简单的点操作,从而提高效率。

基本思想

1)差分数组 是一种记录每个区间开始和结束时增量的数据结构,通过对差分数组的累加,恢复出原数组。

2)对于一个数组 arr[],差分数组 diff[] 的定义是:

  • diff[i] = arr[i] - arr[i-1] (对于第一个元素 arr[0]diff[0] = arr[0])
  • 通过差分数组 diff[],可以在 O(1) 时间内更新 arr[] 的任何区间操作(加法、减法等)。

例子:如何使用差分数组来实现区间更新

假设我们有一个数组 arr[],并希望对其中的一些区间进行加法操作。例如,我们希望对区间 [l, r] 的每个元素加上一个常数值 k

原数组和差分数组:

1
2
arr = [0, 0, 0, 0, 0]    // 初始数组
diff = [0, 0, 0, 0, 0] // 差分数组

[2, 4] 区间加 5

  • 在差分数组中:diff[2] += 5diff[5] -= 5(注意diff[5]越界,表示这个区间的结束,实际上应为diff[r+1]
    1
    diff = [0, 0, 5, 0, 0]  // 差分数组

恢复 arr[]
通过差分数组,我们可以恢复原始数组:

  • arr[0] = diff[0]
  • arr[1] = diff[1] + arr[0]
  • arr[2] = diff[2] + arr[1]
  • arr[3] = diff[3] + arr[2]
  • arr[4] = diff[4] + arr[3]

恢复后,arr[] 的值为 [0, 0, 5, 5, 5],即区间 [2, 4] 被加上了 5。

差分数组的关键在于记录增量,在更新时,直接在差分数组中修改该区间的起始点和结束点的值,通过对差分数组的累加即可恢复原数组的值。

扫描线算法

扫描线算法(Sweep Line Algorithm)是一种在计算几何、计算机图形学以及一些优化问题中广泛使用的算法。其基本思想是:通过将问题的二维空间按照某一方向扫描,并实时更新状态,从而高效解决问题

扫描线算法通过“扫描”整个问题空间,在每个扫描位置更新数据结构(比如维护一个排序的事件队列)来处理这些事件点。这个方法比直接计算每对元素之间的关系要高效得多,尤其是在处理大量数据时。

常见应用

  1. 动态区间问题:例如计算一个时间段内的活动数量,或者计算某个时间段的最大并发请求数。
  2. 区间求和:可以在O(1)时间内实现区间范围的累加和查询。
  3. 图形处理:例如计算矩形重叠区域的面积问题,扫描线算法也广泛应用于计算几何。
Comments
On this page
有一张表里面有三个字段,分别是(id,开始时间,结束时间),表中数据量为 5000W,如何统计流量最大的时候有多少条数据?