1. Observable 詳細用法
Observable 是 RxJS 的核心概念,代表一個可觀察的數據流。
創建和訂閱 Observable
import { Observable } from "rxjs";// 1. 創建Observable
const myObservable = new Observable(subscriber => {// 發出三個值subscriber.next('第一個值');subscriber.next('第二個值');subscriber.next('第三個值');// 模擬異步操作setTimeout(() => {subscriber.next('異步值');subscriber.complete(); // 完成流}, 1000);// 可選的清理邏輯return () => {console.log('Observable被取消訂閱');};
});// 2. 訂閱Observable
const subscription = myObservable.subscribe({next: value => console.log('收到值:', value),error: err => console.error('發生錯誤:', err),complete: () => console.log('流已完成')
});// 3. 取消訂閱 (通常在組件銷毀時調用)
setTimeout(() => {subscription.unsubscribe();
}, 2000);/* 輸出順序:
收到值: 第一個值
收到值: 第二個值
收到值: 第三個值
(等待1秒)
收到值: 異步值
流已完成
(再等待1秒)
Observable被取消訂閱
*/
2. of 操作符詳細用法
of
用于創建一個會立即發出給定參數的 Observable。
基本示例
import { of } from "rxjs";// 發出固定值
of('蘋果', '香蕉', '橙子').subscribe({next: fruit => console.log('水果:', fruit),complete: () => console.log('水果列表結束')
});/* 輸出:
水果: 蘋果
水果: 香蕉
水果: 橙子
水果列表結束
*/// 發出不同類型的數據
of('字符串',123,true,{name: 'Alice'},[1, 2, 3],function hello() { return 'world'; }
).subscribe(val => console.log('收到的值:', val));// 實際應用:模擬API返回
function mockApiCall() {return of({id: 1, name: '用戶1'});
}mockApiCall().subscribe(user => {console.log('用戶數據:', user);
});
3. from 操作符詳細用法
from
可以將多種數據類型轉換為 Observable。
各種來源的轉換
import { from } from "rxjs";// 1. 從數組創建
from([10, 20, 30]).subscribe(num => console.log('數字:', num));// 2. 從Promise創建
const promise = fetch('https://api.example.com/data').then(response => response.json());from(promise).subscribe(data => {console.log('API數據:', data);
});// 3. 從字符串創建 (每個字符作為單獨的值)
from('Hello').subscribe(char => console.log(char));
// 輸出: H, e, l, l, o// 4. 從Map或Set創建
const myMap = new Map();
myMap.set('name', 'Alice');
myMap.set('age', 25);from(myMap).subscribe(entry => {console.log('Map條目:', entry);// 輸出: ['name', 'Alice'], ['age', 25]
});// 5. 實際應用:批量處理數組
const userIds = [1, 2, 3, 4];from(userIds).subscribe(id => {console.log('處理用戶ID:', id);// 這里可以調用API獲取每個用戶的詳細信息
});
4. forkJoin 操作符詳細用法
forkJoin
用于并行執行多個 Observable,等待它們全部完成。
完整示例
import { forkJoin, of, from, throwError } from "rxjs";
import { delay, catchError } from "rxjs/operators";// 模擬API函數
function getUser(id) {return of({ id, name: `用戶${id}` }).pipe(delay(1000));
}function getUserPosts(userId) {const posts = [{ id: 1, title: '帖子1' },{ id: 2, title: '帖子2' }];return of(posts).pipe(delay(1500));
}function getUserComments(userId) {return from(fetch(`https://api.example.com/users/${userId}/comments`));
}// 1. 基本用法
forkJoin([getUser(1),getUserPosts(1),getUserComments(1).pipe(catchError(error => of(`獲取評論失敗: ${error.message}`)))
]).subscribe({next: ([user, posts, comments]) => {console.log('用戶:', user);console.log('帖子:', posts);console.log('評論:', comments);},error: err => console.error('整體失敗:', err),complete: () => console.log('所有請求完成')
});// 2. 對象形式更清晰
forkJoin({user: getUser(1),posts: getUserPosts(1),comments: getUserComments(1).pipe(catchError(error => of([])) // 錯誤時返回空數組)
}).subscribe({next: result => {console.log('整合結果:', result);// 結構: { user: {...}, posts: [...], comments: [...] }}
});// 3. 錯誤處理演示
forkJoin({success: of('成功'),failure: throwError(new Error('出錯了'))
}).pipe(catchError(error => {console.log('捕獲到錯誤:', error);return of({ success: null, failure: error.message });})
).subscribe(result => {console.log('最終結果:', result);
});// 4. 實際應用:并行請求多個API
function loadDashboardData() {return forkJoin({user: getUser(1),notifications: from(fetch('/api/notifications')),settings: from(fetch('/api/settings'))});
}loadDashboardData().subscribe(data => {console.log('儀表盤數據:', data);// 更新UI...
});
綜合實戰示例
import { forkJoin, from, of } from "rxjs";
import { map, mergeMap, catchError } from "rxjs/operators";// 模擬API服務
class ApiService {static getUsers() {const users = [{ id: 1, name: 'Alice' },{ id: 2, name: 'Bob' }];return of(users).pipe(delay(500));}static getUserDetails(userId) {const details = {1: { age: 25, email: 'alice@example.com' },2: { age: 30, email: 'bob@example.com' }};return of(details[userId]).pipe(delay(300));}static getUserPosts(userId) {const posts = {1: [{ id: 101, title: 'Alice的第一篇帖子' }],2: [{ id: 201, title: 'Bob的帖子' }, { id: 202, title: 'Bob的另一篇帖子' }]};return of(posts[userId] || []).pipe(delay(700));}
}// 1. 獲取所有用戶及其詳細信息和帖子
ApiService.getUsers().pipe(mergeMap(users => {// 為每個用戶創建請求數組const userRequests = users.map(user => forkJoin({details: ApiService.getUserDetails(user.id),posts: ApiService.getUserPosts(user.id)}).pipe(map(data => ({ ...user, ...data }))));// 并行執行所有用戶請求return forkJoin(userRequests);})
).subscribe({next: completeUsers => {console.log('完整用戶數據:', completeUsers);/* 輸出:[{id: 1,name: 'Alice',details: { age: 25, email: 'alice@example.com' },posts: [{ id: 101, title: 'Alice的第一篇帖子' }]},{id: 2,name: 'Bob',details: { age: 30, email: 'bob@example.com' },posts: [{...}, {...}]}]*/},error: err => console.error('獲取用戶數據失敗:', err)
});// 2. 實際應用:表單提交后并行更新多個資源
function updateResources(userData, postsData, settingsData) {return forkJoin({user: from(fetch('/api/user', {method: 'PUT',body: JSON.stringify(userData)})),posts: from(fetch('/api/posts', {method: 'POST',body: JSON.stringify(postsData)})),settings: from(fetch('/api/settings', {method: 'PATCH',body: JSON.stringify(settingsData)}))}).pipe(map(responses => ({user: responses.user.json(),posts: responses.posts.json(),settings: responses.settings.json()})));
}// 使用示例
updateResources({ name: '新名字' },[{ title: '新帖子' }],{ theme: 'dark' }
).subscribe({next: results => {console.log('所有資源更新成功:', results);},error: err => {console.error('更新失敗:', err);// 顯示錯誤提示}
});
這些示例展示了 RxJS 操作符在實際開發中的典型用法。關鍵點:
Observable
是基礎,代表數據流of
用于創建簡單的同步流from
用于從各種數據源創建流forkJoin
用于并行執行多個 Observable 并合并結果